[Pitch] Add Hashable conformance to AsyncStream.Continuation

Use cases operating with multiple AsyncStream's may need to store multiple continuations. When handling onTermination callback, client code needs to remove relevant continuation.

To identify relevant continuation, client code needs to be able to compare continuations.

It is possible to associate lookup key with each continuation, but this is inefficient. AsyncStream.Continuation already stores a reference to AsyncStream._Storage whose identity can be used to provide simple and efficient Hashable conformance.

Consider this simple Observer pattern with AsyncSequence-based API. To avoid implementing AsyncSequence from scratch it uses AsyncStream as a building block. To support multiple subscribes new stream is returned every time.

@MainActor private class Sender {
    var value: Int = 0 {
        didSet {
            for c in continuations {
                c.yield(value)
            }
        }
    }

    var values: some AsyncSequence<Int, Never> {
        AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
            continuation.yield(value)
            self.continuations.insert(continuation)
            continuation.onTermination = { _ in
                DispatchQueue.main.async {
                    self.continuations.remove(continuation)
                }
            }
        }
    }

    private var continuations: Set<AsyncStream<Int>.Continuation> = []
}

Without Hashable conformance, continuation needs to be associated with articial identifier, e.g. object identity by wrapping each continuation in a class:

@MainActor private class Sender {
    var value: Int = 0 {
        didSet {
            for c in continuations {
                c.value.yield(value)
            }
        }
    }

    var values: some AsyncSequence<Int, Never> {
        AsyncStream<Int> { (continuation: AsyncStream<Int>.Continuation) -> Void in
            continuation.yield(value)
            let box = ContinuationBox(value: continuation)
            self.continuations.insert(box)
            continuation.onTermination = { _ in
                DispatchQueue.main.async {
                    self.continuations.remove(box)
                }
            }
        }
    }

    private var continuations: Set<ContinuationBox> = []

    private final class ContinuationBox: Hashable, Sendable {
        let value: AsyncStream<Int>.Continuation

        init(value: AsyncStream<Int>.Continuation) {
            self.value = value
        }

        static func == (lhs: Sender.ContinuationBox, rhs: Sender.ContinuationBox) -> Bool {
            lhs === rhs
        }

        func hash(into hasher: inout Hasher) {
            hasher.combine(ObjectIdentifier(self))
        }
    }
}

Note that capturing continuation or box in onTermination is safe, because onTermination is dropped after being called. And it is always called, even if AsyncStream is discarded without even being iterated.

7 Likes