Hello! I am currently experimenting replacing our use of ReactiveSwift with async/await.
One pattern we use in RS is to have a "Signal" type which is shared between a few different consumers to run different pipelines. When replacing those Signals with AsyncStream, I ran into this error: _Concurrency/AsyncStreamBuffer.swift:253: Fatal error: attempt to await next() on more than one task.
Example playground code:
import _Concurrency
let stream = AsyncStream<Int> { continuation in
Task.detached {
for _ in 0..<100 {
try! await Task.sleep(nanoseconds: 1_000_000_000)
continuation.yield(Int.random(in: 1...10))
}
continuation.finish()
}
}
Task.detached {
for await random in stream {
print ("\(random)")
}
}
Task.detached {
for await random in stream {
print ("\(random)")
}
}
I've tried a few different workarounds like wrapping the AsyncStream in another class which returns copies from makeAsyncIterator(), but haven't found one that works yet.
I'm not sure any AsyncSequence is meant to have multiple consumers or whether that's left up to the sequence, but as you've found, AsyncStream doesn't support it. (@Philippe_Hausler or @David_Smith can speak to that more directly.) I believe the suggested solution is usually to vend separate streams for each consumer. I think creating your own wrapping sequence which supports multiple consumers may also work, if that's not a violation of the AsyncSequence contract.
@Jon_Shier is correct; there are not currently any many-consumer AsyncSequence types yet. That pattern is useful in ReactiveSwift or similar systems. For example: in Combine it is called share.
I have been looking at stuff in this area and there are two different forms from what I have seen of AsyncSequence analogs; either you are sharing an iterator, or you are splitting off values.
Sharing an iterator means that each next call consumes a produced value. Splitting off values means that you know ahead of time how many interested parties and they all get the same value and await until all have read before moving on to the next value.
Long story turned short: there is a missing part that we need to design to handle multiple consumers of AsyncSequences, specifically AsyncStream.
In addition to split off, you can also have the structure similar to Disruptors where multiple consumers will consume each element in parallel but they don't need to move in lockstep before taking the next value.
An implementation with this pattern for Swift Concurrency / Async sequences would be pretty cool, as the devil is in the details there for great performance.
Hello, I ran into a similar problem as @gabeshahbazian I've looked at SharedAsyncSequence Structure Reference and it seems quite handy - thanks, ultimately though I think under the hood, this is just creating multiple streams.
I was wondering if there has been any development on this since January. I think there a lot of use cases for a to-many relationship.