SE-0475: Transactional Observation of Values

I got curious to try and reproduce this. @Philippe_Hausler is there an updated toolchain? The one in the first post is not available anymore. Thanks!

Philippe posted a packaged version of the implementation here:

—-

I will let Philippe respond more completely but will note that by my reading this is not an accurate description of the proposed semantics:

Is there part of the proposal text which you think should be edited to make this clearer?

1 Like

I believe it is accurate, because I draw logical conclusions from the documented lack of buffer. When next() is called, it can not return the last known value, because this value was not buffered. In the cases where this last known value has not been notified before, it is lost forever.

See the sample code I provided: you'll see that the value modified at step (3) CAN NOT be returned at step (5). If the value is not further modified, the app displays an obsolete value forever.

Maybe this can be OK in some scenarios. IMHO it is generally not.

Is there part of the proposal text which you think should be edited to make this clearer?

The "Detailed design" section is written upside down: it derives the behavior from the implementation, instead of deriving the implementation from the desired behavior. I suggest this gets reversed.

2 Likes

I have setup the following project that uses Observations do bind a model to an UILabel:

@Observable
class Model {
    var count = 0
}

class ViewController: UIViewController {
    
    let label = UILabel()
    
    lazy var button = UIButton(
        primaryAction: UIAction(title: "count") { [unowned self] _ in
            model.count += 1
        }
    )
    
    let model = Model()

    override func viewDidLoad() {
        super.viewDidLoad()
        
        view.backgroundColor = .systemBackground
        
        layout()
        
        let counts = Observations { self.model.count }
        
        Task {
          for await count in counts {
              await display(count)
          }
        }
        
    }
    
    func display(_ count: Int) async {
        try? await Task.sleep(for: .seconds(2))
        self.label.text = "\(count)"
    }

}

@gwendal.roue do you mean that if I double click the button, the label will end up stuck displaying 1, despite the final value of count is 2? I can't reproduce. There's a delay but the label always displays 2 eventually.

1 Like

I don’t believe a buffer is required in order to ensure eventual consistency in your example. When an observable property is read, the Observation library subscribes to it before reading the current value; this is an observation pattern which would be correct even under concurrent modification. Observations must simply ensure that any change notifications it receives after starting evaluation (rather than after evaluation is complete) reliably cause the sequence to be marked as dirty, so that the next call to next() will trigger reevaluation rather than waiting. This can even be done locklessly without much trouble.

5 Likes

You're right, the dirty flag you describe totally fits the bill :+1:

(Instead of returning the last known value, next() would return the current value, which is just as good at first sight).

3 Likes

Right, and that’s unavoidable. If there’s a modification that’s truly concurrent with the call to next(), it’s of course non-deterministic whether the call will observe the result of that change, because there’s a possible ordering of events that’s indistinguishable from the change just happening later. But the library can at least guarantee that subsequent calls will eventually see the effects of the change.

5 Likes

Yeah, I think a key bit of insight here is that the target of the Observations type can be an arbitrary expression and not merely a property value. In simple cases like person.name you can talk about “the value” as a single thing that exists both at willSet time and at delivery time, but in more complex cases the target expression might not bear any resemblance to any expression anywhere else in the codebase—in that situation the properties which trigger the willSet observation may influence the value of the expression, but the only way for us to actually obtain the value of the target expression is to reevaluate it from scratch after the transaction has closed. And once we do reevaluate it, the set of properties we end up observing on the next go-around might be very different than on the first go-around!

1 Like

I'm happy we have an agreement. If I understand correctly, the proposal does not handle this yet.

This is what I'd like to press on a bit—aside from the delivery of the initial value for iterators aside from the first, it's been my reading that this has always been the semantics of the proposal, so I'm curious to get your take on how that could be made more clear/explicit. Would a more direct statement about 'eventual consistency' somewhere up-front be sufficient, or do you think that the proposal as written is actively misleading in some way?

The proposal, my responses, and implementation account for this; the tracking is re-invoked per each time next is called. It is not a one-shot and done tracking.

E.g.

@Observable 
final class Model {
  var property1: Bool = false
  var property2 = "a"
  var property3 = "b
}

let values = Observations {
  if model.property1 { 
     return property2
  } else { 
     return property3
  }
}

This ends up tracking property1 then property3 initially. Then when property1 changes it ends up tracking property1 and property2 for willSet's. This is due to the existing behavior of Observation.

1 Like

I indeed think that "eventual consistency" says nothing about when, precisely, the iterations eventually get their fresh value. There lies the problem.

The problematic scenario that I outlined above fits this wording: the iteration will get a fresh value when a future change is performed. All good. But it gets it much too late, as far as I understand, or even never, if no further change is performed.

The proposal does not mention any buffer or "dirty state" as suggested by @John_McCall.
The proposal still contains the sentence "This case dropped the last value of the iteration".

This is my last message about this. I've done everything I can in order to raise eyebrows and the desire to write more test cases. I give up.

2 Likes

Surely proposals should propose behaviour, not implementations?

A system which never delivered an update after the event of concern would not fit the definition of eventual consistency (which promises that a system always resolves to a consistent state eventually, even if no further updates are made).

We generally discourage authors from including implementation details like this in proposals and instead focus on the semantics of the proposed feature. Implementation feedback is very much welcome but is generally out-of-scope for review threads and more appropriately provided on the implementation PR (except insofar as one is trying to understand whether a behavior observed in the prototype implementation is part of the proposed semantics).

This is a good callout of a potential line that could be confusing. My reading of that line, and @Philippe_Hausler can confirm, is that the value was not so much dropped from the sequence, but rather dropped from the output because the program terminated prior to the iterator picking up and printing the final update delivered by the sequence. (I agree this line is potentially ambiguous and worth clarifying!)

5 Likes

That particular example shows that a value will be coalesced if timing is denied to the iteration. The last value is only the last printed value that the program emits, further values would be emitted if the program was permitted to run further.

regarding the proposal text:

Multiple Iterators

the proposal states:

The expectation of observing a property from a type as an AsyncSequence is that multiple iterations of the same sequence from multiple tasks will emit the same values at the same iteration points. The following code is expected to emit the same values in both tasks.

<example of code iterating in different Tasks>...

In this case both tasks will get the same values upon the same events.

what are the definitions of 'iteration points' and 'events' in this context? my assumption is 'awaiting a call to next()' and 'an observation tracking block fired', respectively. if a value won't be 'buffered' anywhere, then each iterator must read a new value independently, which i believe implies that they can end up seeing different values. so either some of the quoted text is a bit misleading, or my assumed definitions are wrong – in either case, clarity here would be welcome.

i'm also inclined to wonder if Gwendal's suggestion upthread that support for multiple iterators perhaps be postponed might be prudent. it seems like it adds increased complexity to the possible program behaviors and risk of bugs/developer confusion (or, alternatively, some motivation for why it is important to support now rather than later). AsyncStream was originally implemented without support for multiple iteration (kind of), but then was later changed to allow it (though i think that sort of bypassed the evolution process).

Cancellation/Termination

the proposal doesn't cover the intended behaviors with respect to Task cancellation. personally i've found the Task cancellation & sequence termination semantics for AsyncStream somewhat bewildering, so i think this subject may deserve a bit more exposition. some questions that might be worth covering:

  1. if there are two iterators awaiting a value, and one's Task is canceled, what is supposed happen?
  2. after a terminal value is returned from the 'source' closure, will it ever be invoked again, or is it guaranteed not to be?

I haven't been following along too closely… but my impression is that a lot of the churn on this design review is over some questions engineers have about the implementation details and behaviors of the Sequence type itself. Since the design review implies that we ship with this Sequence type implementation… we are then ending up in a place where we potentially block the design review on an implementation review.

What if the Sequence was injectable? What if a product engineer that needed to make use of Observed Values made their own decision about the implementation of that Sequence? I do realize that could mean rethinking the original design… but it might also generalize the intended design to an extent that we no longer block a design review on an implementation review.

2 Likes

The latest implementation works, according to my expectations (approximately equivalent to Combine.CurrentValueSubject+buffer(size: 1)+map but in an AsyncSequence kind of way).

Sorry that I contributed to the mess of reviewing the implementation rather than the design but it's hard to understand the limitations of a design where the limitations of an implementation are more concrete.

Anyway... definite thumbs up :+1: from me as I aim to make observing between a Model actor and MainActor views more async/awaity.

7 Likes

This is actually very reminiscent of the similar issue we encountered in throttled sequences:

(which is still an issue AFAIK, we stopped using them (unfortunately, would like to!) after that one)

1 Like

I had a play with the available implementation at GitHub - phausler/ObservationSequence: A test package of the Observed implementation for SE-0475, and I could not exhibit the behavior I was fearing (i.e. that an observer task is not notified of a change if it is not awaiting in next() when a change is committed). I just made a simple test, and did not try to stress-test. The inability of the reviewers to acknowledge my concerns put my level of trust to almost zero (especially after my comments were requested), but at least I could not spot this implementation flaw.

The simple test
import Testing
import Observation
import ObservationSequence

@Observable class Person {
    var name: String
    
    init(name: String) {
        self.name = name
    }
}

// This test should exit.
@MainActor @Test func example() async throws {
    let person = Person(name: "Start")
    let names = Observations { person.name }
    for await name in names {
        print("Fresh name:", name)
        if name == "End" { break }
        try await Task.sleep(nanoseconds: 1_000_000_000)
        person.name = "End" // iteration is not awaiting in next() in this transaction
        try await Task.sleep(nanoseconds: 1_000_000_000)
    }
}
1 Like