Disclaimer: I have no idea if this is a good solution in any means
// https://forums.swift.org/t/sendablekeypath/67195/8
#if compiler(>=6)
public typealias _SendableKeyPath<Root, Value> = any KeyPath<Root, Value> & Sendable
public typealias _SendableWritableKeyPath<Root, Value> = any WritableKeyPath<Root, Value>
& Sendable
#else
public typealias _SendableKeyPath<Root, Value> = KeyPath<Root, Value>
public typealias _SendableWritableKeyPath<Root, Value> = WritableKeyPath<Root, Value>
#endif
func sendableKeyPath<Root, Value>(
_ keyPath: KeyPath<Root, Value>
) -> _SendableKeyPath<Root, Value> {
unsafeBitCast(keyPath, to: _SendableKeyPath<Root, Value>.self)
}
func sendableKeyPath<Root, Value>(
_ keyPath: WritableKeyPath<Root, Value>
) -> _SendableWritableKeyPath<Root, Value> {
unsafeBitCast(keyPath, to: _SendableWritableKeyPath<Root, Value>.self)
}
extension Observable {
@MainActor
func observe<V>(keyPath: KeyPath<Self, V>) -> AsyncStream<V> where KeyPath<Self, V>: Sendable {
let keyPath = sendableKeyPath(keyPath)
let (stream, continuation) = AsyncStream.makeStream(of: V.self)
let isCancelled = Mutex(false)
@Sendable func _observe() {
withObservationTracking({
return self[keyPath: keyPath]
}, onChange: {
if isCancelled.withLock({ $0 }) {
return
}
DispatchQueue.main.async {
if isCancelled.withLock({ $0 }) {
return
}
continuation.yield(self[keyPath: keyPath])
_observe()
}
})
}
_observe()
continuation.onTermination = { _ in
isCancelled.withLock { $0 = true }
}
defer {
continuation.yield(self[keyPath: keyPath])
}
return stream
}
}
@Observable
@MainActor
final class Model {
var count = 0
}
@MainActor
final class PlaygroundTests: XCTestCase {
func test_observe() async {
print("start", #function)
let model = Model()
let task = Task {
var result: [Int] = []
for await item in model.observe(keyPath: \.count) {
result.append(item)
}
return result
}
let max = 10
for _ in 1...max {
model.count += 1
print(model.count)
try? await Task.sleep(for: .seconds(0.01))
}
task.cancel()
let values = await task.value
print("values", values)
XCTAssertEqual(values.count, max) // ✅
XCTAssertEqual(values, Array(1...max)) // ✅
}
}
This is based on the @mbrandonw:
- Tracking properties in `@Observable` models internally - #10 by mbrandonw
- SendableKeyPath? - #8 by mbrandonw (
KeyPath
Sendable
workaround)
The main difference from the gist is that the observation makes sense only for the @Observable
models annotated with @MainActor
. This way based on DispatchQueue.main
it should deliver a reliable stream.
But still, the biggest issue is that there is a hop which is required to recursively observe the property and due to the lack of publicity available withObservationTracking(_ apply: willSet:didSet:)
variant. That might hit you hard based on the assumption that you make around the observation greatly explained by @tcldr:
- [Accepted with revision] SE-0395: Observability - #4 by tcldr
- [Accepted with revision] SE-0395: Observability - #19 by tcldr
As @Philippe_Hausler noted in the revised version of the Observable proposal, it was mentioned as a potential candidate for future involvement, though unfortunately, this has not come to pass yet.
However, in the case of the AsyncStream
delivery as an observation, it already introduce concurrency on every next()
item.