'Consuming an AsyncStream from multiple Tasks' Redux

hi! this is basically a follow-up question to this post from a couple years ago. i'm trying to understand what the intended behavior of AsyncStream is when being 'consumed' via multiple (concurrent) iterators. the evolution document for AsyncStream states:

As with any sequence, iterating over an AsyncStream multiple times, or creating multiple iterators and iterating over them separately, may produce an unexpected series of values. Neither AsyncStream nor its iterator are @Sendable types, and concurrent iteration is considered a programmer error.

additionally the docs for AsyncStream.Iterator state:

This type doesn’t conform to Sendable. Don’t use it from multiple concurrent contexts. It is a programmer error to invoke next() from a concurrent context that contends with another such call, which results in a call to fatalError().

however, one can seemingly iterate over an AsyncStream from multiple Tasks, and there is neither compile-time nor runtime feedback that this may violate the API (at least that i could see). for example:

func multiConsume() async {
    var startStream: (() -> Void)!
    let stream = AsyncStream<Int> { continuation in
        startStream = {
            Task {
                for i in 0..<1_000 {
                    try! await Task.sleep(nanoseconds: 1_000_000)
                    continuation.yield(i)
                }
                continuation.finish()
            }
        }
    }

    let t1 = Task {
        // this races with starting the stream but still illustrates the scenario in question
        for await value in stream {
            print ("task 1: \(value)")
        }
    }

    let t2 = Task {
        for await value in stream {
            print ("task 2: \(value)")
        }
    }

    startStream()

    _ = await (t1.value, t2.value)
}

this example will consume the elements of the stream, but which one is consumed by which for await loop is arbitray. however, if we swap out the AsyncStream with an AsyncThrowingStream, then the code will generally crash at runtime with the alluded-to fatal error 'attempt to await next() on more than one task'.

is there a reason for this behavioral difference between the two flavors of AsyncStream, or should they both have runtime checks to attempt to prevent concurrent iteration?

3 Likes

I have called out this behavioural difference in my backpressured async stream proposal SE-0406. I agree that this is confusing and we should rectify the implementation; however, there are applications that depend on the current behaviour and enforcing the fatal error might break those.

It is my opinion, that Async[Throwing]Stream is a single consumer async sequence that should support only a single iterator to be created and that iterator doesn't allow concurrent consumption. In fact, the current implementation somewhat enforces this since a cancellation of a single call to next() will terminate the whole stream. I can recommend taking a look at my PR over in swift-async-algorithms where I am intending to land an externally backpressured single consumer async sequence.

Coming back to the root of your question. I agree that implementing a multi consumer async algorithm is important and I have a few approaches lying around. The main question when implementing such an algorithm is how the back pressured should be applied i.e. should the slowest consumer dictate the speed or should slow consumer buffer elements potentially ballooning the memory of a process.

4 Likes

thanks for the response and the pointers to other relevant information! as a slight aside, that cancellation behavior with multiple consumers is... quite surprising!

after reading through your referenced proposal, the following stood out to me from the 'muti/single consumer support' section:

AsyncStream does support the creation of multiple iterators and it does handle multiple consumers correctly

after playing around with various permutations of AsyncStream with multiple consumers and reading through the implementation a bit, i'm wondering if you could help me understand another related issue. consider the following:

func multiConsume() async {
    let (stream, continuation) = AsyncStream<Int>.makeStream()

    await withTaskGroup(of: Void.self) { group in
        for i in 1...10 {
            group.addTask {
                print("starting iterator \(i)")
                for await value in stream {
                    print("iterator \(i): \(value)")
                }
                print("iterator \(i) finished")
            }
        }

        group.addTask {
            print("terminating stream")
            continuation.finish()
        }
    }
}

when i run this code locally, it appears to generally 'hang' indefinitely, for lack of a better description. my suspicion is that finish()ing the stream doesn't actually propagate that information to all the stored continuations that may have accumulated from multiple consumers, so they end up being 'leaked' and just sitting around un-terminated.

This does indeed hang and looks like a bug in AsyncStream. Could you please file an issue on the swift repo including your reproducer.

filed here. discovered this very similar existing report in the process.