What is the current approach to Swift Concurrency multicast / broadcast use cases?

When Swift Concurrency was first introduced, the main use case for it (for me) was handling one-time callbacks that were previously usually processed with closure APIs (and DispatchQueue and other abstractions under the hood).
However, from the very beginning there were AsyncSequence APIs, that can be consumed by multiple or infinite number of clients, similar to how Combine publishers work. However, it was unclear to me how to write such APIs if not by wrapping other older APIs, as default AsyncStream and AsyncSequence are consumed by a single client.
I imagine one way to have broadcast / multicast streams is to create multiple AsyncChannel objects, store them in the array and call each time when the 'main' Stream or Sequence is changed. Is this currently the preferred way to create a broadcast stream, or there's some part that I am missing?

1 Like

It depends on both (a) what you define as a “client”; and (b) the implementation of the AsyncSequence.

A Combine Publisher has the explicit notion of a Subscriber and Subscription, so it can manage the state of what has been received by a given Subscriber. But, AsyncSequence does not have the concepts of subscribers/subscriptions. Instead, it has “iterators”, and if single sequence has multiple iterators, each can have its own state.

Thus, you can create an AsyncSequence with multiple AsyncIterator instances. E.g., if you have two separate for await value in sequence {…} loops, each will have its own iterator, potentially each with its own state.

Consider the following, in which we create a single AsyncSequence, but the two loops are creating their own iterators, and thus each will get the full series of values from their respective iterator:

/// Asynchronous sequence to yield values roughly every second
///
/// Adapted from https://developer.apple.com/documentation/swift/asynciteratorprotocol

struct Counter: AsyncSequence {
    typealias Element = Int
    let howHigh: Int

    struct AsyncIterator: AsyncIteratorProtocol {
        let howHigh: Int
        var current = 1

        mutating func next() async -> Int? {
            do {
                try await Task.sleep(for: .seconds(1))
            } catch {
                return nil
            }
            
            guard current <= howHigh else {
                return nil
            }

            let result = current
            current += 1
            return result
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(howHigh: howHigh)
    }
}

let sequence = Counter(howHigh: 5)

// these both will return all five values each

Task {
    for await value in sequence { // this effectively calls `makeAsyncIterator` and loops calling `next` on this iterator
        print("a", value)
    }
}

Task {
    for await value in sequence { // this creates another iterator and loops through it
        print("b", value)
    }
}

But, not all AsyncSequence implementations follow this pattern. Consider the following AsyncStream implementation: Although it is an AsyncSequence, it behaves differently, where it will loop through a single, shared sequence of values, despite have distinct iterators:

let stream = AsyncStream<Int> { continuation in
    let task = Task {
        for i in 1 ... 5 {
            try await Task.sleep(for: .seconds(1))
            continuation.yield(i)
        }
        continuation.finish()
    }
    
    continuation.onTermination = { state in
        if case .cancelled = state {
            task.cancel()
        }
    }
}

// these two will race to see which gets the next value in the sequence, despite having their own iterators

Task {
    for await value in stream {
        print("a", value)
    }
}

Task {
    for await value in stream {
        print("b", value)
    }
}

The example in the AsyncIteratorProtocol documentation suggests that the state should be managed by the iterator. But the AsyncStream implementation suggests it should be managed by the sequence, itself. It is not entirely clear which is the idiomatic approach.

I only mention it as a word of caution to avoid drawing broader conclusions about AsyncSequence solely from AsyncStream behaviors.

Yep, that’s a perfectly acceptable way. (Just make sure to synchronize the state for thread-safety.) Or, as in the AsyncIteratorProtocol example, write your own AsyncSequence that manages the state at the iterator level.

3 Likes