Two approaches to observe @Observable object without/outside View

Hi all,

I'm trying to refactor my ObservableObject with publishers to @Observable objects.

Currently I actually use combine to observe between these ObservableObjects for some states usage. But I can't do so in @Observable objects.

So now I tried to ways to do so:

  1. Let @Observable class have a internal @ObservableObject as a publisher to publish certain properties for usage.
  2. Use AsyncStream to keep call withObservationTrack.

Here is my code:

@Observable
class MyObservalbeObject: @unchecked Sendable {
  let publishers = MySubObject()
  var name: String = "Frank"
  var age: Int = 40
  var nickname: String = ""
  
  init() {
  }
  
  static let shared: MyObservalbeObject = {
    return MyObservalbeObject()
  }()
  
  func setNickname(_ value: String) {
    self.nickname = value
    self.publishers.nickName = value
  }
  
  func observeName() -> AsyncStream<String> {
    return AsyncStream { continuation in
      internalTrackName(continuation)
      
      // Handle stream termination to invalidate observations
      continuation.onTermination = { _ in
        continuation.finish()
        print("observeName DONE.")
      }
    }
  }
  
  private func internalTrackName(_ continuation: AsyncStream<String>.Continuation) {
    _ = withObservationTracking {
      continuation.yield(self.name)
    } onChange: {
      Task {
        await MainActor.run {
          self.internalTrackName(continuation)
        }
      }
    }
  }
}

class MySubObject: ObservableObject, @unchecked Sendable {
  @Published internal var nickName: String = ""
}

class MyModel: ObservableObject, @unchecked Sendable {
  let myObject = MyObservalbeObject.shared
  private var cancellable: AnyCancellable?
  
  var task: Task<Void, Never>? = nil

  init() {
    trackNameFromObservedObjectProperty()
    trackNameFromAsyncStream()
  }
  
  func changeName() {
    myObject.name = "MyObject"
  }
  
  func trackNameFromObservedObjectProperty() {
    cancellable = myObject.publishers.$nickName
      .sink { [weak self] newName in
        print("Updated name: \(newName)")
      }
  }
  
  func trackNameFromAsyncStream() {
    task = Task {
      for await name in myObject.observeName() {
        print("trackName3: \(name)")
      }
    }
    
    Task {
      await task?.value
      task = nil
    }
  }
  
  func cancelTracking() {
    task?.cancel()
    task = nil
  }
}

I'm wondering it's the proper way to use in production?

You might be interested in this thread https://forums.swift.org/t/how-to-use-observation-to-actually-observe-changes-to-a-property

Yes, I actually referred from the post which is really inspiring.

Disclaimer: I have no idea if this is a good solution in any means

// https://forums.swift.org/t/sendablekeypath/67195/8
#if compiler(>=6)
public typealias _SendableKeyPath<Root, Value> = any KeyPath<Root, Value> & Sendable
public typealias _SendableWritableKeyPath<Root, Value> = any WritableKeyPath<Root, Value>
& Sendable
#else
public typealias _SendableKeyPath<Root, Value> = KeyPath<Root, Value>
public typealias _SendableWritableKeyPath<Root, Value> = WritableKeyPath<Root, Value>
#endif

func sendableKeyPath<Root, Value>(
    _ keyPath: KeyPath<Root, Value>
) -> _SendableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableKeyPath<Root, Value>.self)
}

func sendableKeyPath<Root, Value>(
    _ keyPath: WritableKeyPath<Root, Value>
) -> _SendableWritableKeyPath<Root, Value> {
    unsafeBitCast(keyPath, to: _SendableWritableKeyPath<Root, Value>.self)
}

extension Observable {
    @MainActor
    func observe<V>(keyPath: KeyPath<Self, V>) -> AsyncStream<V> where KeyPath<Self, V>: Sendable {
        let keyPath = sendableKeyPath(keyPath)
        let (stream, continuation) = AsyncStream.makeStream(of: V.self)
        let isCancelled = Mutex(false)

        @Sendable func _observe() {
            withObservationTracking({
                return self[keyPath: keyPath]
            }, onChange: {
                if isCancelled.withLock({ $0 }) {
                    return
                }
                DispatchQueue.main.async {
                    if isCancelled.withLock({ $0 }) {
                        return
                    }
                    continuation.yield(self[keyPath: keyPath])
                    _observe()
                }
            })
        }
        _observe()

        continuation.onTermination = { _ in
            isCancelled.withLock { $0 = true }
        }

        defer {
            continuation.yield(self[keyPath: keyPath])
        }

        return stream
    }
}

@Observable
@MainActor
final class Model {
    var count = 0
}

@MainActor
final class PlaygroundTests: XCTestCase {
    func test_observe() async {
        print("start", #function)
        let model = Model()

        let task = Task {
            var result: [Int] = []
            for await item in model.observe(keyPath: \.count) {
                result.append(item)
            }
            return result
        }

        let max = 10
        for _ in 1...max {
            model.count += 1
            print(model.count)
            try? await Task.sleep(for: .seconds(0.01))
        }

        task.cancel()
        let values = await task.value
        print("values", values)
        XCTAssertEqual(values.count, max)       // ✅
        XCTAssertEqual(values, Array(1...max))   // ✅
    }
}

This is based on the @mbrandonw:

The main difference from the gist is that the observation makes sense only for the @Observable models annotated with @MainActor. This way based on DispatchQueue.main it should deliver a reliable stream.

But still, the biggest issue is that there is a hop which is required to recursively observe the property and due to the lack of publicity available withObservationTracking(_ apply: willSet:didSet:) variant. That might hit you hard based on the assumption that you make around the observation greatly explained by @tcldr:

As @Philippe_Hausler noted in the revised version of the Observable proposal, it was mentioned as a potential candidate for future involvement, though unfortunately, this has not come to pass yet.

However, in the case of the AsyncStream delivery as an observation, it already introduce concurrency on every next() item.

1 Like

Depending on what exactly you want to do when you are observing fields of an @Observable model, you may be able to get away with something very low-tech:

@Observable
class FeatureModel {
  var name: String = "Frank" { 
    didSet { print("name changed") }
  }
  var age: Int = 40 { 
    didSet { print("age changed") }
  }
  var nickname: String = "" { 
    didSet { print("nickname changed") }
  }
}

Can you describe what you are doing in your ObservableObject currently that requires observation of each field?