Two approaches to observe @Observable object without/outside View

Disclaimer: I have no idea if this is a good solution in any means

// https://forums.swift.org/t/sendablekeypath/67195/8
#if compiler(>=6)
public typealias _SendableKeyPath<Root, Value> = any KeyPath<Root, Value> & Sendable
public typealias _SendableWritableKeyPath<Root, Value> = any WritableKeyPath<Root, Value>
& Sendable
#else
public typealias _SendableKeyPath<Root, Value> = KeyPath<Root, Value>
public typealias _SendableWritableKeyPath<Root, Value> = WritableKeyPath<Root, Value>
#endif

func sendableKeyPath<Root, Value>(
    _ keyPath: KeyPath<Root, Value>
) -> _SendableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableKeyPath<Root, Value>.self)
}

func sendableKeyPath<Root, Value>(
    _ keyPath: WritableKeyPath<Root, Value>
) -> _SendableWritableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableWritableKeyPath<Root, Value>.self)
}

extension Observable {
    @MainActor
    func observe<V>(keyPath: KeyPath<Self, V>) -> AsyncStream<V> where KeyPath<Self, V>: Sendable {
        let keyPath = sendableKeyPath(keyPath)
        let (stream, continuation) = AsyncStream.makeStream(of: V.self)
        let isCancelled = Mutex(false)

        @Sendable func _observe() {
            withObservationTracking({
                return self[keyPath: keyPath]
            }, onChange: {
                if isCancelled.withLock({ $0 }) {
                    return
                }
                DispatchQueue.main.async {
                    if isCancelled.withLock({ $0 }) {
                        return
                    }
                    continuation.yield(self[keyPath: keyPath])
                    _observe()
                }
            })
        }
        _observe()

        continuation.onTermination = { _ in
            isCancelled.withLock { $0 = true }
        }

        defer {
            continuation.yield(self[keyPath: keyPath])
        }

        return stream
    }
}

@Observable
@MainActor
final class Model {
    var count = 0
}

@MainActor
final class PlaygroundTests: XCTestCase {
    func test_observe() async {
        print("start", #function)
        let model = Model()

        let task = Task {
            var result: [Int] = []
            for await item in model.observe(keyPath: \.count) {
                result.append(item)
            }
            return result
        }

        let max = 10
        for _ in 1...max {
            model.count += 1
            print(model.count)
            try? await Task.sleep(for: .seconds(0.01))
        }

        task.cancel()
        let values = await task.value
        print("values", values)
        XCTAssertEqual(values.count, max)       // ✅
        XCTAssertEqual(values, Array(1...max))   // ✅
    }
}

This is based on the @mbrandonw:

The main difference from the gist is that the observation makes sense only for the @Observable models annotated with @MainActor. This way based on DispatchQueue.main it should deliver a reliable stream.

But still, the biggest issue is that there is a hop which is required to recursively observe the property and due to the lack of publicity available withObservationTracking(_ apply: willSet:didSet:) variant. That might hit you hard based on the assumption that you make around the observation greatly explained by @tcldr:

As @Philippe_Hausler noted in the revised version of the Observable proposal, it was mentioned as a potential candidate for future involvement, though unfortunately, this has not come to pass yet.

However, in the case of the AsyncStream delivery as an observation, it already introduce concurrency on every next() item.

1 Like