When Swift Concurrency was first introduced, the main use case for it (for me) was handling one-time callbacks that were previously usually processed with closure APIs (and DispatchQueue and other abstractions under the hood).
However, from the very beginning there were AsyncSequence APIs, that can be consumed by multiple or infinite number of clients, similar to how Combine publishers work. However, it was unclear to me how to write such APIs if not by wrapping other older APIs, as default AsyncStream and AsyncSequence are consumed by a single client.
I imagine one way to have broadcast / multicast streams is to create multiple AsyncChannel objects, store them in the array and call each time when the 'main' Stream or Sequence is changed. Is this currently the preferred way to create a broadcast stream, or there's some part that I am missing?
It depends on both (a) what you define as a “client”; and (b) the implementation of the AsyncSequence
.
A Combine Publisher
has the explicit notion of a Subscriber
and Subscription
, so it can manage the state of what has been received by a given Subscriber
. But, AsyncSequence
does not have the concepts of subscribers/subscriptions. Instead, it has “iterators”, and if single sequence has multiple iterators, each can have its own state.
Thus, you can create an AsyncSequence
with multiple AsyncIterator
instances. E.g., if you have two separate for await value in sequence {…}
loops, each will have its own iterator, potentially each with its own state.
Consider the following, in which we create a single AsyncSequence
, but the two loops are creating their own iterators, and thus each will get the full series of values from their respective iterator:
/// Asynchronous sequence to yield values roughly every second
///
/// Adapted from https://developer.apple.com/documentation/swift/asynciteratorprotocol
struct Counter: AsyncSequence {
typealias Element = Int
let howHigh: Int
struct AsyncIterator: AsyncIteratorProtocol {
let howHigh: Int
var current = 1
mutating func next() async -> Int? {
do {
try await Task.sleep(for: .seconds(1))
} catch {
return nil
}
guard current <= howHigh else {
return nil
}
let result = current
current += 1
return result
}
}
func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(howHigh: howHigh)
}
}
let sequence = Counter(howHigh: 5)
// these both will return all five values each
Task {
for await value in sequence { // this effectively calls `makeAsyncIterator` and loops calling `next` on this iterator
print("a", value)
}
}
Task {
for await value in sequence { // this creates another iterator and loops through it
print("b", value)
}
}
But, not all AsyncSequence
implementations follow this pattern. Consider the following AsyncStream
implementation: Although it is an AsyncSequence
, it behaves differently, where it will loop through a single, shared sequence of values, despite have distinct iterators:
let stream = AsyncStream<Int> { continuation in
let task = Task {
for i in 1 ... 5 {
try await Task.sleep(for: .seconds(1))
continuation.yield(i)
}
continuation.finish()
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
// these two will race to see which gets the next value in the sequence, despite having their own iterators
Task {
for await value in stream {
print("a", value)
}
}
Task {
for await value in stream {
print("b", value)
}
}
The example in the AsyncIteratorProtocol
documentation suggests that the state should be managed by the iterator. But the AsyncStream
implementation suggests it should be managed by the sequence, itself. It is not entirely clear which is the idiomatic approach.
I only mention it as a word of caution to avoid drawing broader conclusions about AsyncSequence
solely from AsyncStream
behaviors.
Yep, that’s a perfectly acceptable way. (Just make sure to synchronize the state for thread-safety.) Or, as in the AsyncIteratorProtocol
example, write your own AsyncSequence
that manages the state at the iterator level.