Use cases operating with multiple AsyncStream
's may need to store multiple continuations. When handling onTermination
callback, client code needs to remove relevant continuation.
To identify relevant continuation, client code needs to be able to compare continuations.
It is possible to associate lookup key with each continuation, but this is inefficient. AsyncStream.Continuation
already stores a reference to AsyncStream._Storage
whose identity can be used to provide simple and efficient Hashable
conformance.
Consider this simple Observer pattern with AsyncSequence
-based API. To avoid implementing AsyncSequence
from scratch it uses AsyncStream
as a building block. To support multiple subscribes new stream is returned every time.
@MainActor private class Sender {
var value: Int = 0 {
didSet {
for c in continuations {
c.yield(value)
}
}
}
var values: some AsyncSequence<Int, Never> {
AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
continuation.yield(value)
self.continuations.insert(continuation)
continuation.onTermination = { _ in
DispatchQueue.main.async {
self.continuations.remove(continuation)
}
}
}
}
private var continuations: Set<AsyncStream<Int>.Continuation> = []
}
Without Hashable
conformance, continuation needs to be associated with articial identifier, e.g. object identity by wrapping each continuation in a class:
@MainActor private class Sender {
var value: Int = 0 {
didSet {
for c in continuations {
c.value.yield(value)
}
}
}
var values: some AsyncSequence<Int, Never> {
AsyncStream<Int> { (continuation: AsyncStream<Int>.Continuation) -> Void in
continuation.yield(value)
let box = ContinuationBox(value: continuation)
self.continuations.insert(box)
continuation.onTermination = { _ in
DispatchQueue.main.async {
self.continuations.remove(box)
}
}
}
}
private var continuations: Set<ContinuationBox> = []
private final class ContinuationBox: Hashable, Sendable {
let value: AsyncStream<Int>.Continuation
init(value: AsyncStream<Int>.Continuation) {
self.value = value
}
static func == (lhs: Sender.ContinuationBox, rhs: Sender.ContinuationBox) -> Bool {
lhs === rhs
}
func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}
}
Note that capturing continuation
or box
in onTermination
is safe, because onTermination
is dropped after being called. And it is always called, even if AsyncStream
is discarded without even being iterated.