Two approaches to observe @Observable object without/outside View

Hi all,

I'm trying to refactor my ObservableObject with publishers to @Observable objects.

Currently I actually use combine to observe between these ObservableObjects for some states usage. But I can't do so in @Observable objects.

So now I tried to ways to do so:

  1. Let @Observable class have a internal @ObservableObject as a publisher to publish certain properties for usage.
  2. Use AsyncStream to keep call withObservationTrack.

Here is my code:

@Observable
class MyObservalbeObject: @unchecked Sendable {
  let publishers = MySubObject()
  var name: String = "Frank"
  var age: Int = 40
  var nickname: String = ""
  
  init() {
  }
  
  static let shared: MyObservalbeObject = {
    return MyObservalbeObject()
  }()
  
  func setNickname(_ value: String) {
    self.nickname = value
    self.publishers.nickName = value
  }
  
  func observeName() -> AsyncStream<String> {
    return AsyncStream { continuation in
      internalTrackName(continuation)
      
      // Handle stream termination to invalidate observations
      continuation.onTermination = { _ in
        continuation.finish()
        print("observeName DONE.")
      }
    }
  }
  
  private func internalTrackName(_ continuation: AsyncStream<String>.Continuation) {
    _ = withObservationTracking {
      continuation.yield(self.name)
    } onChange: {
      Task {
        await MainActor.run {
          self.internalTrackName(continuation)
        }
      }
    }
  }
}

class MySubObject: ObservableObject, @unchecked Sendable {
  @Published internal var nickName: String = ""
}

class MyModel: ObservableObject, @unchecked Sendable {
  let myObject = MyObservalbeObject.shared
  private var cancellable: AnyCancellable?
  
  var task: Task<Void, Never>? = nil

  init() {
    trackNameFromObservedObjectProperty()
    trackNameFromAsyncStream()
  }
  
  func changeName() {
    myObject.name = "MyObject"
  }
  
  func trackNameFromObservedObjectProperty() {
    cancellable = myObject.publishers.$nickName
      .sink { [weak self] newName in
        print("Updated name: \(newName)")
      }
  }
  
  func trackNameFromAsyncStream() {
    task = Task {
      for await name in myObject.observeName() {
        print("trackName3: \(name)")
      }
    }
    
    Task {
      await task?.value
      task = nil
    }
  }
  
  func cancelTracking() {
    task?.cancel()
    task = nil
  }
}

I'm wondering it's the proper way to use in production?

1 Like

You might be interested in this thread https://forums.swift.org/t/how-to-use-observation-to-actually-observe-changes-to-a-property

1 Like

Yes, I actually referred from the post which is really inspiring.

Disclaimer: I have no idea if this is a good solution in any means

// https://forums.swift.org/t/sendablekeypath/67195/8
#if compiler(>=6)
public typealias _SendableKeyPath<Root, Value> = any KeyPath<Root, Value> & Sendable
public typealias _SendableWritableKeyPath<Root, Value> = any WritableKeyPath<Root, Value>
& Sendable
#else
public typealias _SendableKeyPath<Root, Value> = KeyPath<Root, Value>
public typealias _SendableWritableKeyPath<Root, Value> = WritableKeyPath<Root, Value>
#endif

func sendableKeyPath<Root, Value>(
    _ keyPath: KeyPath<Root, Value>
) -> _SendableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableKeyPath<Root, Value>.self)
}

func sendableKeyPath<Root, Value>(
    _ keyPath: WritableKeyPath<Root, Value>
) -> _SendableWritableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableWritableKeyPath<Root, Value>.self)
}

extension Observable {
    @MainActor
    func observe<V>(keyPath: KeyPath<Self, V>) -> AsyncStream<V> where KeyPath<Self, V>: Sendable {
        let keyPath = sendableKeyPath(keyPath)
        let (stream, continuation) = AsyncStream.makeStream(of: V.self)
        let isCancelled = Mutex(false)

        @Sendable func _observe() {
            withObservationTracking({
                return self[keyPath: keyPath]
            }, onChange: {
                if isCancelled.withLock({ $0 }) {
                    return
                }
                DispatchQueue.main.async {
                    if isCancelled.withLock({ $0 }) {
                        return
                    }
                    continuation.yield(self[keyPath: keyPath])
                    _observe()
                }
            })
        }
        _observe()

        continuation.onTermination = { _ in
            isCancelled.withLock { $0 = true }
        }

        defer {
            continuation.yield(self[keyPath: keyPath])
        }

        return stream
    }
}

@Observable
@MainActor
final class Model {
    var count = 0
}

@MainActor
final class PlaygroundTests: XCTestCase {
    func test_observe() async {
        print("start", #function)
        let model = Model()

        let task = Task {
            var result: [Int] = []
            for await item in model.observe(keyPath: \.count) {
                result.append(item)
            }
            return result
        }

        let max = 10
        for _ in 1...max {
            model.count += 1
            print(model.count)
            try? await Task.sleep(for: .seconds(0.01))
        }

        task.cancel()
        let values = await task.value
        print("values", values)
        XCTAssertEqual(values.count, max)       // ✅
        XCTAssertEqual(values, Array(1...max))   // ✅
    }
}

This is based on the @mbrandonw:

The main difference from the gist is that the observation makes sense only for the @Observable models annotated with @MainActor. This way based on DispatchQueue.main it should deliver a reliable stream.

But still, the biggest issue is that there is a hop which is required to recursively observe the property and due to the lack of publicity available withObservationTracking(_ apply: willSet:didSet:) variant. That might hit you hard based on the assumption that you make around the observation greatly explained by @tcldr:

As @Philippe_Hausler noted in the revised version of the Observable proposal, it was mentioned as a potential candidate for future involvement, though unfortunately, this has not come to pass yet.

However, in the case of the AsyncStream delivery as an observation, it already introduce concurrency on every next() item.

1 Like

Depending on what exactly you want to do when you are observing fields of an @Observable model, you may be able to get away with something very low-tech:

@Observable
class FeatureModel {
  var name: String = "Frank" { 
    didSet { print("name changed") }
  }
  var age: Int = 40 { 
    didSet { print("age changed") }
  }
  var nickname: String = "" { 
    didSet { print("nickname changed") }
  }
}

Can you describe what you are doing in your ObservableObject currently that requires observation of each field?

Since your onChange retains self, this object can never be deallocated if its tracked objects don’t change after tracking begins. You could try to send a signal or post notification without retaining self.

Otherwise it’ll only work it in a application where the object that tracks and the objects being tracked all are never deallocated.

1 Like

Correct. It is tricky to cancel withObservationTracking with existing form where API does not expose explicit way to cancel the observation.

I played around a bit and I found a way to be working, allowing to deinit the observed object + finish the stream (but take it with a grain of salt). Finishing the stream is more tricky as once the object is deallocated there is no signal (at least I'm not aware) on which the stream could be finished. withObservationTracking just finishes when instance gets deallocated. For that purpose I spin an unstructured Task to watch for object non nil otherwise finish the stream.

"Improved" version with a test on which I checked the behavior:

extension Observable {
    @MainActor
    func observe<V>(keyPath: KeyPath<Self, V>) -> AsyncStream<V> where KeyPath<Self, V>: Sendable, Self: AnyObject {
        let keyPath = sendableKeyPath(keyPath)
        let (stream, continuation) = AsyncStream.makeStream(of: V.self)
        let isCancelled = Mutex(false)
        var zombieSelfDetectorTask: Task<Void, Never>?

        Self._observe { [weak self] in
            zombieSelfDetectorTask?.cancel()
            zombieSelfDetectorTask = nil

            guard let self else {
                isCancelled.withLock { $0 = true }
                continuation.finish()
                return
            }

            let value = self[keyPath: keyPath]
            continuation.yield(value)

            // Watch for zombie self
            zombieSelfDetectorTask = Task { @MainActor [weak self] in
                while self != nil {
                    try? await Task.sleep(for: .seconds(1))
                }
                isCancelled.withLock { $0 = true }
                continuation.finish()
            }
        }

        continuation.onTermination = { _ in
            isCancelled.withLock { $0 = true }
        }

        return stream
    }

    @MainActor
    private static func _observe(execute: @MainActor @escaping () -> Void) {
        withObservationTracking {
            execute()
        } onChange: {
            DispatchQueue.main.async {
                _observe(execute: execute)
            }
        }
    }
}

@Observable
@MainActor
private class Model {
    var count = 0

    deinit {
        print("deinit")
    }
}

@MainActor
final class PlaygroundTests: XCTestCase {
    func test_observe_deinit() async {
        var model: Model! = Model()
        let stream: AsyncStream<Int> = model.observe(keyPath: \.count)
        let task = Task {
            for await value in stream {
                print("consume", value)
            }
        }

        Task { [weak model] in
            for _ in 0 ... 1000 {
                guard model != nil else { return }
                try await Task.sleep(for: .seconds(2))
                print("produce next", model?.count)
                model?.count += 1
            }
        }

        try? await Task.sleep(for: .seconds(1))
        model = nil
        await task.value
        // Double check if nothing is produced/consumed anymore
        try? await Task.sleep(for: .seconds(5))
    }
}
1 Like

My purpose is to get a continuous notification on onChange and use it like combine's subscription as simple code as possible in some Tasks.

didSet maybe another choice but sometime I met issues it's not working. So I choose AsyncStream.

You are right. I need be more careful in the closure.