How to know when observing an AsyncSequence

I have some many use cases where I need to know when an AsyncSequence is being observed. Some of these use cases are related to testing workflows. Other use cases are related to production code.

In my latest case I have an actor that has to know once observation of a sequence has started:

actor Observer<S: AsyncSequence> {
    let sequence: S
    
    init(sequence: S) {
        self.sequence = sequence
    }
    
    private var isObserving = false
    
    func startObserving() async throws {
        guard !isObserving else { return }
        isObserving = true
        
        Task {
            for try await element in sequence {
                // Do something with each element...
                print("-- element: \(element)")
            }
        }
    }
    
    func doSomething() async throws {
        try await startObserving()
        // At this point, I need to know for certain that the async sequence is being observed.
    }
    
}

Obviously this won't work because of the Task that is created for the observation. So I tried something like this:

func startObserving() async throws {
        guard !isObserving else { return }
        isObserving = true
        
        Task {
            for try await element in sequence {
                // Do something with each element...
                print("-- element: \(element)")
            }
        }
        
        // Yield to make sure task has started.
        await Task.yield()
    }

And that works, sometimes. But I need it to work all the time.

So then I tried this:

class BoolWrapper {
        var value: Bool
        init(_ value: Bool) { self.value = value }
    }
    
    func startObserving() async throws {
        guard !isObserving else { return }
        isObserving = true
        
        let taskStarted = BoolWrapper(false)
        
        Task {
            taskStarted.value = true
            for try await element in sequence {
                // Do something with each element...
                print("-- element: \(element)")
            }
        }
        
        // Yield to make sure task has started.
        while !taskStarted.value {
            await Task.yield()
        }
    }

And I'm not even sure that work all the time and it's starting to feel like a hack.

Has anybody else faced this and is there a solution?

Have you tried waiting for an event flag, which is set when the observation starts?

import Foundation

actor Observer<S: AsyncSequence> {
    let sequence: S
    
    init (sequence: S) {
        self.sequence = sequence
    }
    
    private var isObserving = false
    
    func startObserving (started eventFlag: DispatchSemaphore) async throws {
        guard !isObserving else { return }
        isObserving = true
        
        Task {
            // started
            eventFlag.signal()

            for try await element in sequence {
                // Do something with each element...
                print ("-- element: \(element)")
            }
        }
    }
    
    func doSomething() async throws {
        let eventFlag = DispatchSemaphore (value: 0)

        try await startObserving (started: eventFlag)
        // At this point, I need to know for certain that the async sequence is being observed.
        eventFlag.wait()
    }
}

Working through a related problem recently, I found that the way to overcome the testing-related problems with startup of an AsyncSequence was to ensure that the code within startObserving was called in the init method of the owner, rather than after initialization, so in your case as part of the init method of your Observer actor.

While the rest of the long thread would be a red herring, this specific post lays out the code to do that, and the test results: Reliably testing code that adopts Swift Concurrency? - #73 by babbage (Later: I see now you actually participated in the early part of that thread, but the linked post was a late follow-up.)

If you still need to be able to turn on and off whether or not the Observer actually acts on observations, you could still have a bool like shouldObserve, toggle that on and off again, and then guard shouldObserve else { return } as the first line of for try await element in sequence... However, the order of toggling that vs. receiving actual events it is observing would likely be in a non-deterministic order given it is an Actor. So more likely it would be better to instantiate an Actor whenever you want to be observing, start observing in its init method, and then discard the actor instance when you no longer want to be observing.

Would be interested to hear if this slightly different approach to your underlying problem is a solution for you or not. :slight_smile:

Thank you for the suggestion. I believe that is the same thing as the let taskStarted = BoolWrapper(false) solution in my original post. I think it mostly works. However it seems that between setting the flag and observation there is a chance for a race.

Thank you @babbage for the suggestion. I originally had the observation taking place in the init of the actor, however I need the actor's init to be synchronous. It will be an error at Swift 6 to create a Task and capture self in the synchronous init of an actor. So I would need to make the init async, which ends up affecting a lot of our other code negatively.

Also, I understand your linked code works for testing 1,000,000 tests, but I don't know that it's deterministically going to pass every time and on every processor. I see pretty similar results with strategic uses of Task.yield() on my M1 macbook pro, but those don't always translate to my colleague's intel MacBook Pro. In your case it seems like there could be a race between the unstructured task spawned in the view model init and the code that relies on it.

No, using a DispatchSemaphore is guaranteed to work 100 percent provided it does not interfere with the inner workings of the actor.

Give it a try and see what happens - you won't set off a nuclear reaction :slight_smile:

Do you have a link to the info on this? Iā€™d like to read more.

Try this:

actor Foo {
        init() {
            Task {
                await bar()
            }
        }
        
        func bar() {}
    }

You should see this warning:

Actor 'self' can only be captured by a closure from an async initializer; this is an error in Swift 6

1 Like

@ibex10 Thank you for your suggestion. It seems to me however that there could be a race between these 2 lines:

eventFlag.signal()

for try await element in sequence {

Thanks for that further info re triggering the warning.

I take your point about the non-deterministic unstructured Task. In an async method one could wait for it to return in order to avoid that but obviously that's not a solution here. Calling Task.yield() in unit tests seems like a code smell to me. I'll take the 1,000,000 test passes over the alternative in the meantime. :slightly_smiling_face:

Wait a minute! :slight_smile: I don't understand why there could be a race here, :confused:. Please elaborate.

Unfortunately it can't go in the loop because that wouldn't be triggered until the first element is emitted by the async sequence.

The issue is that it might pass for me on my M1 Macbook pro, but things like this often fail for my colleague on his late model intel Mac book pro.