I have some many use cases where I need to know when an AsyncSequence is being observed. Some of these use cases are related to testing workflows. Other use cases are related to production code.
In my latest case I have an actor that has to know once observation of a sequence has started:
actor Observer<S: AsyncSequence> {
let sequence: S
init(sequence: S) {
self.sequence = sequence
}
private var isObserving = false
func startObserving() async throws {
guard !isObserving else { return }
isObserving = true
Task {
for try await element in sequence {
// Do something with each element...
print("-- element: \(element)")
}
}
}
func doSomething() async throws {
try await startObserving()
// At this point, I need to know for certain that the async sequence is being observed.
}
}
Obviously this won't work because of the Task that is created for the observation. So I tried something like this:
func startObserving() async throws {
guard !isObserving else { return }
isObserving = true
Task {
for try await element in sequence {
// Do something with each element...
print("-- element: \(element)")
}
}
// Yield to make sure task has started.
await Task.yield()
}
And that works, sometimes. But I need it to work all the time.
So then I tried this:
class BoolWrapper {
var value: Bool
init(_ value: Bool) { self.value = value }
}
func startObserving() async throws {
guard !isObserving else { return }
isObserving = true
let taskStarted = BoolWrapper(false)
Task {
taskStarted.value = true
for try await element in sequence {
// Do something with each element...
print("-- element: \(element)")
}
}
// Yield to make sure task has started.
while !taskStarted.value {
await Task.yield()
}
}
And I'm not even sure that work all the time and it's starting to feel like a hack.
Has anybody else faced this and is there a solution?
Have you tried waiting for an event flag, which is set when the observation starts?
import Foundation
actor Observer<S: AsyncSequence> {
let sequence: S
init (sequence: S) {
self.sequence = sequence
}
private var isObserving = false
func startObserving (started eventFlag: DispatchSemaphore) async throws {
guard !isObserving else { return }
isObserving = true
Task {
// started
eventFlag.signal()
for try await element in sequence {
// Do something with each element...
print ("-- element: \(element)")
}
}
}
func doSomething() async throws {
let eventFlag = DispatchSemaphore (value: 0)
try await startObserving (started: eventFlag)
// At this point, I need to know for certain that the async sequence is being observed.
eventFlag.wait()
}
}
Working through a related problem recently, I found that the way to overcome the testing-related problems with startup of an AsyncSequence was to ensure that the code within startObserving was called in the init method of the owner, rather than after initialization, so in your case as part of the init method of your Observer actor.
While the rest of the long thread would be a red herring, this specific post lays out the code to do that, and the test results: Reliably testing code that adopts Swift Concurrency? - #73 by babbage (Later: I see now you actually participated in the early part of that thread, but the linked post was a late follow-up.)
If you still need to be able to turn on and off whether or not the Observer actually acts on observations, you could still have a bool like shouldObserve, toggle that on and off again, and then guard shouldObserve else { return } as the first line of for try await element in sequence... However, the order of toggling that vs. receiving actual events it is observing would likely be in a non-deterministic order given it is an Actor. So more likely it would be better to instantiate an Actor whenever you want to be observing, start observing in its init method, and then discard the actor instance when you no longer want to be observing.
Would be interested to hear if this slightly different approach to your underlying problem is a solution for you or not.
Thank you for the suggestion. I believe that is the same thing as the let taskStarted = BoolWrapper(false) solution in my original post. I think it mostly works. However it seems that between setting the flag and observation there is a chance for a race.
Thank you @babbage for the suggestion. I originally had the observation taking place in the init of the actor, however I need the actor's init to be synchronous. It will be an error at Swift 6 to create a Task and capture self in the synchronous init of an actor. So I would need to make the init async, which ends up affecting a lot of our other code negatively.
Also, I understand your linked code works for testing 1,000,000 tests, but I don't know that it's deterministically going to pass every time and on every processor. I see pretty similar results with strategic uses of Task.yield() on my M1 macbook pro, but those don't always translate to my colleague's intel MacBook Pro. In your case it seems like there could be a race between the unstructured task spawned in the view model init and the code that relies on it.
Thanks for that further info re triggering the warning.
I take your point about the non-deterministic unstructured Task. In an async method one could wait for it to return in order to avoid that but obviously that's not a solution here. Calling Task.yield() in unit tests seems like a code smell to me. I'll take the 1,000,000 test passes over the alternative in the meantime.