SE-0395: Observability

A very common problem with observation is stopping it. The proposal includes a handleChanges example:

@MainActor
func handleChanges(_ example: ChangesExample) {
    Task { @MainActor in
        for await change in example.changes(for: [\.someField, \.someOtherField]) {
            ...
        }
    }
}

I think this is exactly the pattern many people will reach for. How would this observation be cancelled? How can example ever be released, and if this observation is part of another object (which is the norm), how would the observer ever be released?

A possible motivating example, would be an ViewModel that is observing another object, and is itself observable.

@MainActor
@Observable class ViewModel {
    private var model: Model

    // This is what the View will observe
    public var filteredBooks: [String]

    init(model: Model) {
        self.model = model

        // I've seen folks reach for this pattern often to update the local property.
        // I'm not clear there's a better solution with the initial proposal since there
        // are no nested keyPaths.
        Task { @MainActor in
            for await newBooks in model.books.values(for: \.books) {
                filteredBooks = newBooks.filter { $0.first == "A" }
            }
        }
    }
}

@Observable class Model {
    var books: [String] = []

    // ... Network/DB async stuff that update `books`...
}

Is there a more correct way to implement this under the current proposal? How can the Task ever be cancelled? What kind of [weak self] dance if any is required here to avoid leaking memory?

As a general comment, I don't see a lot of "real-world-ish" examples in this proposal, other than the SwiftUI example which relies on unspecified changes to SwiftUI (which only Apple can do, and only Apple will know how it is done). Demonstrating how this would work for a small UIKit app that interacts with the network, and that handles errors, memory management, unit tests, and cancellation correctly, could go a long way to validating the design and give a clear example of how we should write our production code.

15 Likes

I assume the same way you'd cancel any other long-running task? By keeping a reference to it and cancel when needed:

_task = Task { @MainActor in
    for await newBooks in model.books.values(for: \.books) {
        filteredBooks = newBooks.filter { $0.first == "A" }
    }
}

// When needed:
_task?.cancel()
_task = nil

Not sure if there's a better pattern.

This has been a source some pain for us since moving to Swift Concurrency in our unit tests. We want to observe changes and assert equality for a sequence of changes, but to test that reliably we need to wait until the observation starts. We've had tests that worked fine up until a new version of Xcode, in which timing/priority/something slightly changes and a test or two starts failing. We've implemented some test framework scaffolding to try to alleviate this and it seems to mostly work, but also seems slightly less than 100% deterministic. That's not ideal because we want our tests to run without failing day to day.

1 Like

Apologies in advance for the omnibus replies - I have been swamped for the past couple of days.

The implementation is here: https://github.com/apple/swift/tree/main/stdlib/public/Observation/Sources/Observation

However I am unsure if any nightlies have the most recent updates which are required to build correctly (I still need to verify the toolchains to ensure that is working as expected).

I am very convinced that we could resolve that pretty trivially with some alterations to macros. The information is there it just needs to be exposed.

The generated memberwise is internal to prevent this issue. Folks needing a public initializer must re-declare their own public initializer that calls to the memberwise version.

Extensions defined in a separate module can't be observable since the base implementation cannot reason about those key paths.

Kind-of... It could be a free function; however the type ObservationTracking would still exist internally.

Initially I had this, however it has some flaws; keeping the oldValue means that it doubles the access to the values (which if there are side-effects that is a non-starter). By just having the subject and keypath it means that the ObservedChange just indicates the change and leaves the client on exactly what to do with that.

The information needed to detect this at runtime is limited to the fact there is no public mechanism to split key paths up into their components. Using non-public variants would mean the KeyPath memory layout would become ABI; which in my opinion is not a good idea. And you are correct it is not easy to enforce at compile time too.

The comment later on by @clayellis is on-point and a very reasonable approach to fix that hole.

Are you looking for a "hey you are being looked at for values" type thing? or are you really looking for "hey there is a demand for 1 value"? The issue about "missing frames" wasn't ignored - I read your post and did a number of explorations on how to resolve it and there was a subtile but I think important change to how values(for:) works - in that it now coalesces values in-between the awaits on the next value (specifically crafted from inspiration from your feedback). Which to be clear, I think is an improvement.

There is a beginning of observation; the call to values(for:). That starts the coalesce of values in-between that call and the call to next() on the iterator. So, if anything changes in-between that call, the new changed value is the one emitted.

The registrar does have enough information to do this. However there would be a requirement that the setup/teardown have some sort of serial nature to it; else it would be potentially a race where the object gets notifications of start 1, start 2, stop 1, stop 2. I think if there were some sort of mechanism to provide that serial nature it could be added post-facto as a future modification. Which might dovetail into the future work for actor support for observation.

That has some interesting implications to the question - first off it would require self to be Sendable. Currently the implementation does not work like that - unless you make .self have dependencies on all the other stored properties. If so it should work as I understand you are expecting - it would emit the instance of self on each change.

Yes, however there are some macro fixes that I am not sure are in the current nightly. (which lead to some pretty gnarly errors claiming circular references etc).

Correct. We also have the added benefit that the macro can be conditional upon its target for the synthesis. E.g. an actor could have a different synthesis applied conforming it to a slightly different protocol etc.

This looks like the macro application failures fixed in a later commit.

2 Likes

Does values(for:) respect cancellation and terminate the sequence? I don’t see any documentation explaining how that works.

Can you rewrite your example in a form that actual code would use? “When needed” is generally during deinit, but in this form, there’s a retain loop, so that would never happen, right? So how would real-world code do this correctly? I’m hoping for something I can show to junior engineers and say “this is how you use it in real apps.”

2 Likes

Given the other edges of the current design in regards to things like atomicity, it doesn't seem out of line to defer that requirement to the developer hooking into the events. Of course, it would be easiest if the event exposed some sort of unique identifier so it's possible to see which observer or observed sequence was still alive, but the events by themselves should be enough for a simple counter implementation. The developer just needs to be careful to lock and scope state updates.

1 Like

Yes - the cancellation behavior is: the task cancellation result in a cooperative cancellation of the iteration via withTaskCancellationHandler here.

2 Likes

@cocoaphony Regarding sequence cancellation see above.

Regarding the retain cycle, I can't see a way to solve it other than directly cancelling the task to break the cycle (not in the deinit, which would never be called as you mentioned)

I'm curious how the authors envisioned observation cancellation as well. And all things considered, I can only repeat what I've said a couple of comments above in this thread: perhaps async sequences don't have the required semantics to cover all the cases needed here.

3 Likes

An example of what currently doesn't work:

class Subject: Sendable {
    let _value = OSAllocatedUnfairLock(uncheckedState: false)
    public var value: Bool {
        get { _value.withLock { $0 } }
        set { _value.withLock { $0 = newValue } }
    }
}

let obj = Subject()
Task {
    print("\(obj.value)")
    for await value in obj.values(for: \.value) {
        print("value")
    }
}
obj.value = true

This code is very likely to at some point print "true". It is not, however, guaranteed to do so, as the assignment to obj.value can happen in between the first print and the call to obj.values(for:). For a Sendable type, starting the coalesce of values at the time obj.values(for:) is called is too late; the observation has to start in the "dirty" state because it is impossible to guarantee that the value did not change between when it was last read and when the observation began.

For non-Sendable types this is not a problem as they use cooperative multitasking rather than preemptive and you control where the suspension points are.

3 Likes

it can work with this:

class Subject: Sendable {
    let _value = OSAllocatedUnfairLock(uncheckedState: false)
    public var value: Bool {
        get {
            _$observationRegistrar.access(self, keyPath: \.value)
            return _value.withLock { $0 } 
        }
        set { 
            _$observationRegistrar.withMutation(of: self, keyPath: \.value) {
                _value.withLock { $0 = newValue } 
            }
        }
    }
}

let obj = Subject()
Task {
    print("\(obj.value)")
    for await value in obj.values(for: \.value) {
        print("value")
    }
}
obj.value = true

however the dirty state that you mentioned later is still the case.

I don't see how that works. How does that prevent the assignment from occurring between the first print and the call to obj.values(for:)? Is the idea that ObservationRegistrar will track changes even with no current observers and report changes which happened before the observation was added? That's equivalent to what I've been asking for, but it's not something that the proposal says will happen.

To be clear that can't even be done with callbacks/closures.

The subject can't have any foreknowledge of things that may eventually execute. As soon as you spawn a task to call anything (AsyncSequence or otherwise) the next line after the task may execute before or after that task starts.

Now this does not have an issue when the task is scheduled on the same actor as the current execution:
e.g.

@SomeGlobalActor
func example() /* NOTE: this is NOT async*/ {
  let obj = Subject()
  Task { @SomeGlobalActor in
    for await value in obj.values(for: \.value) {
    }
  }
  obj.value = true
}

The task cannot be started on the main actor during the current call to the function because it is not suspended. This has the advantage that it can actually work in those scenarios - however callbacks cannot since there is no suspension gating them.

So I think the objection around the first value for examples as this one is more so an objection around the nature of tasks.

1 Like

That can be done with callbacks, closures, or an AsyncSequence. The way to make it work is to send the initial value when the observation is added (perhaps optionally, as with KVO).

1 Like

Are you sure that's the case? I seem to remember someone mentioning that, right now at least, the Task closures start on the global executor and then hop to their context executor. Eventually the compiler might be smart enough to know that the executor in both contexts is the same and remove the hop, but it doesn't right now.

I think the issue is that, at least in your example, even adding the observer is concurrent, so the notion of "initial" value changes by the time it executes. I do wonder if the sequence was captured synchronously if it would buffer the changes before iteration started.

@SomeGlobalActor
func example() /* NOTE: this is NOT async*/ {
  let obj = Subject()
  let values = obj.values(for: \.value)
  obj.value = true
  Task { @SomeGlobalActor in
    for await value in values {
    }
  }
}

Of course that may be invalid due to concurrency concerns. (On a related note I still don't understand the isolation concerns here.)

In that case there is enough information for it to emit true from the values iteration. (and that is the current behavior)

The conceptual idea is that values(for:) assumes that at the time of registration the property being observed is "dirty" and schedules a notification as-if it had seen a modification to that property. In between deliveries of events we don't care what the actual value of the property is; only whether or not it has changed (and our initial state is that we don't know so we have to assume that it has). At the time of delivery we pick what is then the latest value and send it. By the time the code processes this new value it may already be stale, in which case we need to have the guarantee that another change event will be delivered in the future.

This means that adding an observation will sometimes cause a spurious UI rerender/etc. if the value genuinely hasn't changed, but in exchange you will never fail to rerender even if the values are mutated while you are setting up your observation.

2 Likes

What happens to subclasses? I suppose the @Observable macro can't touch any subclass and properties defined in a subclass won't magically become observable. So you'll have to add @Observable to every subclass (unless you want to take advantage of this to create some unobservable properties). I see nothing on that topic, but that seems like something important to mention.

2 Likes

I guess I should mention an alternative to the "send the initial value" idea to fix this race condition even though I think it's a bad solution. The proposal could explicitly guarantee that the following works:

Task {
    let values = obj.values(for: \.value)
    print("\(obj.value)")
    for await value in values {
        print("\(value)")
    }
}

If context.register(.changes, properties: properties) were in ObservedValues.init() rather than ObservedValues.Iterator.init() this would never fail to print true regardless of when the assignment on a different thread is sequenced (it also would break if it were iterated more or less than once; further implementation changes would be needed).

I think this is a bad solution though because it makes the nice elegant obvious way to use the API subtly incorrect in a very hard to spot way. In most apps writing the obvious code would probably honestly nearly always be fine in practice, but if a framework like SwiftUI made that mistake it'd be the sort of subtle bug that's impossible to ever reproduce and things occasionally just don't quite work right. in confusing ways. The "correct" thing is also just unpleasant to write, especially if you're doing something that isn't as trivial as printing a value.

1 Like

I think an "observation" system has subtle semantics that are slightly different from a "publishing" system (such as Combine).

In an observation system, what you want is a "local" value to "mirror" a "remote" value. (I'm using scared-quotes everywhere to emphasize that the terminology isn't to be taken too literally.)

The essential requirement of an observation system is this:

  • If the local value is currently different from the remote value, then it's guaranteed that the system will emit an update from the remote value in a "reasonable" amount of time.

The reasonable amount of time usually relates to the user interface. The update should not be delayed so long that a user might reasonably end up mis-believing that the local value is stable and correct when it's not.

  • The absolute number and timing of remote changes aren't important to the observer, except to the extent demanded by the first requirement (that a mismatch between local and remote be promptly resolved).

This is what legitimizes coalescing. It's also why a set of remote changes that doesn't cause any mismatch doesn't need to emit a notification to the observer (and why the observer typically doesn't want to know about idempotent change sets anyway — flickery UIs are typically not desirable).

It's important, therefore, that an observation system have a good mechanism for providing the guarantee of mismatch resolution. In particular, because it's often hard for observers to get started without a mismatch initially, it's important for initial-value-emission to be part of the observation system.

KVO has the correct observation semantics, which is one reason why it was always a better observation system than (say) pure notifications via NotificationCenter, or Combine. The functionality is almost identical in each case — but not quite.

Now, I don't know whether SE-0395 ought to be pitched as both an observation system and a publishing system bundled together, to cover both use-cases, but I think it would be an unfortunate mistake to make it a publishing system only, as if an observation system is a trivial subset. I don't think it is a trivial subset.

8 Likes

Can you clarify how this is reconciled if the intended public initializer is identical to the generated memberwise one? For example:

@Observable public class ImageDescription {
    public var width: Double
    public var height: Double

    public init(width: Double, height: Double) {
        // How is this implemented without recursion / conflicting declarations?
    }
}
1 Like