I would like to understand this part more. You are saying you don't need to buffer but the proposed async sequence looks to me like it supports multi consumer iteration. If that's the case then we surely need some kind of buffer somewhere right? What if one of the consumers is slow? Will the fastest consumer dictate the speed and updates for others get buffered or will the slowest consumer dictate the speed?
The first iteration's call to next will emit the value as it currently stands (via calling withObservationTracking and returning that parameter.
Per the iteration on the same isolation; since it is transaction bounding willSet's to the next suspension point, it will be asynchronous.
I would love to remove that, however the problem is that the #isolationcan be nil (i.e. your caller's isolation may not be known directly and therefore you must be in a safe context. That means that the closure MUST be sendable. If there was some sort of mechanism to either ensure an isolation (and get that actor as non-will) AND that was somehow overloadable with the existing system (even if we have to disfavor the optional variation) then I would very quickly consider in favor to add an overload to support a non-sendable closure. This is a safety limitation of the language currently.
yep, they are inferred by the top level code to be on the main actor. Folks can technically make Observable types that are Sendable; there are a number of ones in the Apple SDK that do this (however I do recognize that this is perhaps not the common first approach folks may take and therefore having an isolation is favored).
That is not strictly true, you can have the non-isolated initialization version and that would require sending across potential domains.
Technically true, however the buffer is somewhat obscured here; since the behavior is by the suspension on the willSet (a Void type, it buffers as a collapse against the enqueued scheduler as an async suspension) that means that when resumed in the isolation actor it will then read the value all at the same time for each iterator. This does have the consequence that there are no absolute guarantees of same value delivery if you are observing from multiple isolation domains; only that if you are doing it from the same isolation.
Just that I get it right you are saying the buffer is basically on the isolation's underlying executor when we enqueue the job in willSet right?
The second part of your statement is making me a bit nervous though. Let's take your example from above
let names = Observed { person.firstName + " " + person.lastName }
Task.detached {
for await name in names {
print("Task1: \(name)")
}
}
Task.detached {
for await name in names {
print("Task2: \(name)")
}
}
How does this work in practice if there are no buffers? These two tasks are nonisolated so if I start updating the underlying observable object it might be that we are dropping values or what happens here?
@Observable
final class Person {
var firstName: String
var lastName: String
var name: String { "\(firstName) \(lastName)" }
init(firstName: String, lastName: String) {
self.firstName = firstName
self.lastName = lastName
}
}
let person = Person(firstName: "", lastName: "")
let names = Observed { person.name }
Task.detached {
for await name in names {
print("Task1: \(name)")
}
}
Task.detached {
for await name in names {
print("Task2: \(name)")
}
}
var n = 0
while true {
person.firstName = "\(n)"
person.lastName = "\(n)"
try? await Task.sleep(for: .seconds(0.001)) // specifically chosen to be rather short
n += 1
}
So the order of wakeup obviously is non-deterministic and can change. If a consumer lags behind it acts as if the transaction is just wider and encompasses a larger swath of potential changes.
One use case that I find very common when working with @Observabe is the need to cache some derived computation that is rather expensive e.g. sorting and filtering an Array. With the proposed API I think it would look something like this if we want to move that logic out of the View and to the @Observable type:
@Observable
final class MyList {
struct Person {
var firstName: String
var lastName: String
}
var data: [Person] = []
var sortComparator: [SortDescriptor<Person>] = [SortDescriptor(\.firstName)]
var sortedData: [Person] = []
func run() async throws {
for try await sortedData in Observed { self.data.sorted(using: self.sortComparator) } {
self.sortedData = sortedData
}
}
}
The question now is if this guarantees to update atomically, even if multiple level of these caches are involved.
So from a thread safety standpoint this does not magically add critical regions to the observable type; it is not strictly atomic.
However, if the question is "does it do that work on the same actor as it is accessed?" Yes, in that case you are awaiting the suspension on the initial calling actor (which I would hope run in that case is being called safely).
If the iterator is non-isolated then it is still on the mutator's isolation, if the mutator is not isolated then it is transactionally bound via the global concurrent queue (e.g. approximately didSet).
Thanks for expanding. I have one more scenario in mind where I would like to understand the buffering behaviour. What about this?
let person = Person(firstName: "", lastName: "")
let names = Observed { person.name }
Task.detached {
for await name in names {
print("Task1: \(name)")
}
}
Task.detached {
var iterator = names.makeAsyncIterator()
await print("Task2: \(iterator.next())")
try await Task.sleep(for: .seconds(100000))
await print("Task2: \(iterator.next())")
}
var n = 0
while true {
person.firstName = "\(n)"
person.lastName = "\(n)"
try? await Task.sleep(for: .seconds(0.001)) // specifically chosen to be rather short
n += 1
}
The await is on the willSet trigger not the value itself - that's the "magic" here. So it isn't per se dropping the values but instead batching the transaction as dirty until the next available suspension; which by you introducing work or a sleep is after that point. This is the major reason why this can't really be implemented in terms of didSet; because the transaction boundary won't be able to determine a dirty state and would require a buffer of all values. Which to be clear is probably a different shape and most accurately a different type (you can build that today with AsyncStream but you have to provide a buffering strategy etc).
The major goal here is to add a way to observe the values of a property with a consistent view to how it works for SwiftUI - so that same transactionality is brought across. I don't expect that this interface is really a 100% panacea to all needs. So sometimes the interface that is needed might be in the form of needing absolute truth for ALL values on each time they are set; that obviously isn't a transaction based system and therefore requires a different (but perhaps related) system that this proposal does not cover.
In practice however, I think that this really fills the role quite well for a large swath of use cases and outlines some of the feature sets we should consider for other similar systems in the future. Moreover this approach keeps it consistent with existing systems and helps get some of those hard-to reason about bugs like isolation out of the mix (with some cost when it goes to more advanced concepts like bringing your own isolation or being in a non-isolated context etc). Just like with all concurrency stuff; it is a tradeoff and there are going to be edges that might be unappealing for certain aspects; in practical use this seems to be the best out of all the dozens of iterations I have done on this and seems to serve folks quite well for their asked use cases.
I understand that this would increase the scope of the original proposal⦠but it looks like there are two "tracks" evolving from the feedback here. It looks like many engineers here have feedback and proposals for the behavior of the AsyncSequence implementation used by Observed. If Observed locked down that implementation to a private type then it sounds like we have to consider the feedback on the direction of the AsyncSequence implementation as "blocking" the evolution and adoption of Observed.
If Observed shipped agnostic of any one AsyncSequence implementation (but might offer a default implementation for engineers that chose to use the default implementation) then that extra flexibility might help with the engineers here that are looking for extra control over the behavior of that AsyncSequence.
I was expecting this to lead to two emissions, but instead it seems to break the sequence entirely at this point and it doesn't emit again, though it doesn't terminate either. It's also not deterministic because when I added various print statements to try to see where it stopped, it took a few attempts before breaking.
Am I being stupid here and missing something obvious? Or is it that the example code can't work properly without the bleeding edge toolchain? I did have to add a stub for the non-isolated version of next() but it still seems to work and call the isolated one. The stub isn't called when the problem happens either.
If mutator is isolated, are changes still grouped together? How is mutator's actor obtained? I don't recall observability API allowing this. Can I see the implementation? Edit: found it.
I've tested scenario where Observed is not isolated, but mutations are happening isolated to an actor, and I don't see changes grouped together.
So, it seems that changes are grouped only if Observed is isolated with isolation matching the isolation of the mutations.
I would really prefer to have a version of Observed with non-optional isolation and non-sendable emit. If there is also demand for optional isolation, maybe two versions could be provided?
import Synchronization
import Observation
import Darwin
@Observable
final class Person: Sendable {
private struct Impl: Sendable {
var firstName: String
var lastName: String
}
private let impl: Mutex<Impl>
var firstName: String {
get {
access(keyPath: \.firstName)
return impl.withLock { $0.firstName }
}
set {
impl.withLock { state in
withMutation(keyPath: \.firstName) {
state.firstName = newValue
}
}
}
}
var lastName: String {
get {
access(keyPath: \.lastName)
return impl.withLock { $0.lastName }
}
set {
impl.withLock { state in
withMutation(keyPath: \.lastName) {
state.lastName = newValue
}
}
}
}
var name: String {
access(keyPath: \.firstName)
access(keyPath: \.lastName)
return impl.withLock {
$0.firstName + " " + $0.lastName
}
}
init(firstName: String, lastName: String) {
self.impl = Mutex(Impl(firstName: firstName, lastName: lastName))
}
}
struct WeakRef {
weak var p: Person?
}
@main
struct MainApp {
static func main() async {
var p: Person? = Person(firstName: "0", lastName: "0")
let t: Task<Void, Never>
do {
let px = WeakRef(p: p!)
t = Task.detached {
let names = Observed { px.p?.name }
for await name in names {
print("name = \(name)")
}
}
}
for i in 0..<10 {
p?.firstName = "\(i)"
usleep(1_000_000)
p?.lastName = "\(i)"
try! await Task.sleep(nanoseconds: 1_000_000_000)
}
p = nil
await t.value
}
}
This is definitely something I tried to get working, however the concurrency support in the language today does not allow for this affordance. You are correct that if the language did support such a construct then it would perhaps work even nicer by allowing the removal of the @Sendable. Honestly that would be more favorable of an incantation in my view.
Your example however does not provide a Person type that I would claim is actually Sendable if the expectation of the name is to not be torn. If the type is Sendable, Observation cannot magically group properties together across actor boundaries.
The reason why there is an issue here is that the let names = Observed { px.p?.name } is being called on an unspecified isolation and therefore the transactions are changes from the exterior of that.
One thing that can be done, is to combine isolation + emit into @isolation(any) @Sendable () function.
diff --git a/stdlib/public/Observation/Sources/Observation/Observed.swift b/stdlib/public/Observation/Sources/Observation/Observed.swift
index 04c8aedfa39..9c00fc865ac 100644
--- a/stdlib/public/Observation/Sources/Observation/Observed.swift
+++ b/stdlib/public/Observation/Sources/Observation/Observed.swift
@@ -12,6 +12,7 @@
#if STANDALONE
import Observation
#endif
+import _Concurrency
import Synchronization
/// An asychronous sequence generated from a closure that tracks the transactional changes of `@Observable` types.
@@ -99,9 +100,8 @@ public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendab
}
}
- let isolation: (any Actor)?
let state: SharedState<State>
- let work: @Sendable () throws(Failure) -> Element?
+ let work: @isolated(any) @Sendable () throws(Failure) -> Element?
/// Constructs an asynchronous sequence for a given closure by tracking changes of `@Observable` types.
///
@@ -118,37 +118,22 @@ public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendab
/// - emit: A closure to generate an element for the sequence.
///
public init(
- isolation: isolated (any Actor)? = #isolation,
- @_inheritActorContext _ emit: @Sendable @escaping () throws(Failure) -> Element?
+ @_inheritActorContext _ emit: @isolated(any) @Sendable @escaping () throws(Failure) -> Element?
) {
- self.isolation = isolation
- self.work = { () throws(Failure) -> Element? in
- if let isolation {
- do {
- return try isolation.assumeIsolated { _ in
- try emit()
- }
- } catch {
- throw error as! Failure
- }
- } else {
- return try emit()
- }
- }
+ self.work = emit
self.state = SharedState(State())
}
public struct Iterator: AsyncIteratorProtocol {
- let isolation: (any Actor)?
// the state ivar serves two purposes:
// 1) to store a critical region of state of the mutations
// 2) to idenitify the termination of _this_ sequence
var state: SharedState<State>?
- let work: @Sendable () throws(Failure) -> Element?
+ let work: @isolated(any) @Sendable () throws(Failure) -> Element?
// this is the primary implementation of the tracking
// it is bound to be called on the specified isolation of the construction
- fileprivate static func trackEmission(isolation trackingIsolation: isolated (any Actor)?, state: SharedState<State>, work: @escaping @Sendable () throws(Failure) -> Element?) throws(Failure) -> Element? {
+ fileprivate static func trackEmission(isolation trackingIsolation: isolated (any Actor)?, state: SharedState<State>, work: @escaping () throws(Failure) -> Element?) throws(Failure) -> Element? {
// this ferries in an intermediate form with Result to skip over `withObservationTracking` not handling errors being thrown
// particularly this case is that the error is also an iteration state transition data point (it terminates the sequence)
// so we need to hold that to get a chance to catch and clean-up
@@ -181,7 +166,7 @@ public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendab
if State.startTracking(state) {
guard !Task.isCancelled else { return nil }
// start by directly tracking the emission via a withObservation tracking on the isolation specified fro mthe init
- guard let element = try await Iterator.trackEmission(isolation: isolation, state: state, work: work) else {
+ guard let element = try await Iterator.trackEmission(isolation: work.isolation, state: state, work: work) else {
// the user returned a nil from the closure so terminate the sequence
return try terminate()
}
@@ -207,7 +192,7 @@ public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendab
}
// re-prime the pump for the observation tracking
- guard let element = try await Iterator.trackEmission(isolation: isolation, state: state, work: work) else {
+ guard let element = try await Iterator.trackEmission(isolation: work.isolation, state: state, work: work) else {
// again ensure the user can terminate by returning nil
return try terminate()
}
@@ -221,6 +206,6 @@ public struct Observed<Element: Sendable, Failure: Error>: AsyncSequence, Sendab
}
public func makeAsyncIterator() -> Iterator {
- Iterator(isolation: isolation, state: state, work: work)
+ Iterator(state: state, work: work)
}
}
This would allow to use @Sendable but actor-isolation functions, which can capture non-sendable objects, which is already an improvement.
It still allows to pass non-isolated @Sendable functions. I'm not sure if that is good or bad thing.