Consuming an AsyncStream from multiple Tasks

Hello! I am currently experimenting replacing our use of ReactiveSwift with async/await.

One pattern we use in RS is to have a "Signal" type which is shared between a few different consumers to run different pipelines. When replacing those Signals with AsyncStream, I ran into this error: _Concurrency/AsyncStreamBuffer.swift:253: Fatal error: attempt to await next() on more than one task.

Example playground code:

import _Concurrency

let stream = AsyncStream<Int> { continuation in
    Task.detached {
         for _ in 0..<100 {
             try! await Task.sleep(nanoseconds: 1_000_000_000)
             continuation.yield(Int.random(in: 1...10))
         }
         continuation.finish()
     }
 }

Task.detached {
    for await random in stream {
        print ("\(random)")
    }
}

Task.detached {
    for await random in stream {
        print ("\(random)")
    }
}

I've tried a few different workarounds like wrapping the AsyncStream in another class which returns copies from makeAsyncIterator(), but haven't found one that works yet.


The main use case for the one-to-many AsyncStream is a property wrapper that can be observed on for changes. I have a proof of concept for this idea here: playgrounds/AsyncProperty.swift at 380b21991fe7468185286a038e9d38f62bbb9e9a · gshahbazian/playgrounds · GitHub

Any pointers on another way to form this type of relationship with swift concurrency (without dropping into another framework like Combine)?

3 Likes

I'm not sure any AsyncSequence is meant to have multiple consumers or whether that's left up to the sequence, but as you've found, AsyncStream doesn't support it. (@Philippe_Hausler or @David_Smith can speak to that more directly.) I believe the suggested solution is usually to vend separate streams for each consumer. I think creating your own wrapping sequence which supports multiple consumers may also work, if that's not a violation of the AsyncSequence contract.

3 Likes

@Jon_Shier is correct; there are not currently any many-consumer AsyncSequence types yet. That pattern is useful in ReactiveSwift or similar systems. For example: in Combine it is called share.

I have been looking at stuff in this area and there are two different forms from what I have seen of AsyncSequence analogs; either you are sharing an iterator, or you are splitting off values.

Sharing an iterator means that each next call consumes a produced value. Splitting off values means that you know ahead of time how many interested parties and they all get the same value and await until all have read before moving on to the next value.

Long story turned short: there is a missing part that we need to design to handle multiple consumers of AsyncSequences, specifically AsyncStream.

7 Likes

In addition to split off, you can also have the structure similar to Disruptors where multiple consumers will consume each element in parallel but they don't need to move in lockstep before taking the next value.

If you aren't familiar with it, the Disrupt pattern is a high-performance scalable solution for that use case - source available at Github for the reference implementation and there also a Swift implementation there. There's tons of articles around it, one introduction here

An implementation with this pattern for Swift Concurrency / Async sequences would be pretty cool, as the devil is in the details there for great performance.

This was recently shared with me when asking about a similar issue: SharedAsyncSequence.
Maybe it helps ;)

5 Likes

@Alejandro_Martinez Thanks for sharing! Looks like that pretty much achieves everything we need right now.

I've updated my AsyncProperty type to use it on this branch: With Asynchrone package · gshahbazian/playgrounds@688b21d · GitHub

1 Like

Hello, I ran into a similar problem as @gabeshahbazian I've looked at SharedAsyncSequence Structure Reference and it seems quite handy - thanks, ultimately though I think under the hood, this is just creating multiple streams.

I was wondering if there has been any development on this since January. I think there a lot of use cases for a to-many relationship.

My original question was here: Pattern for AsyncStream - to many … | Apple Developer Forums

Thank you.

@isenwald2020 yes, a new version has just been released on August 9.

It's all that I needed for a new project, instead of writing all of this myself, Red has done it over the course of this year!

I was so happy I decided to sponsor him.

Guys, you could also have a look at this: GitHub - sideeffect-io/AsyncExtensions: AsyncExtensions aims to mimic Swift Combine operators for async sequences.

1 Like

Good to see a few different people working on this.

I wonder if the people writing AsyncAlgorithms will address these gaps...

Are there any plans to create a consistent solution? The Asyncrone library isn't really being supported...

2 Likes

Thank you!