[Pitch] Add Hashable conformance to AsyncStream.Continuation

Use cases operating with multiple AsyncStream's may need to store multiple continuations. When handling onTermination callback, client code needs to remove relevant continuation.

To identify relevant continuation, client code needs to be able to compare continuations.

It is possible to associate lookup key with each continuation, but this is inefficient. AsyncStream.Continuation already stores a reference to AsyncStream._Storage whose identity can be used to provide simple and efficient Hashable conformance.

Consider this simple Observer pattern with AsyncSequence-based API. To avoid implementing AsyncSequence from scratch it uses AsyncStream as a building block. To support multiple subscribes new stream is returned every time.

@MainActor private class Sender {
    var value: Int = 0 {
        didSet {
            for c in continuations {
                c.yield(value)
            }
        }
    }

    var values: some AsyncSequence<Int, Never> {
        AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
            continuation.yield(value)
            self.continuations.insert(continuation)
            continuation.onTermination = { _ in
                DispatchQueue.main.async {
                    self.continuations.remove(continuation)
                }
            }
        }
    }

    private var continuations: Set<AsyncStream<Int>.Continuation> = []
}

Without Hashable conformance, continuation needs to be associated with articial identifier, e.g. object identity by wrapping each continuation in a class:

@MainActor private class Sender {
    var value: Int = 0 {
        didSet {
            for c in continuations {
                c.value.yield(value)
            }
        }
    }

    var values: some AsyncSequence<Int, Never> {
        AsyncStream<Int> { (continuation: AsyncStream<Int>.Continuation) -> Void in
            continuation.yield(value)
            let box = ContinuationBox(value: continuation)
            self.continuations.insert(box)
            continuation.onTermination = { _ in
                DispatchQueue.main.async {
                    self.continuations.remove(box)
                }
            }
        }
    }

    private var continuations: Set<ContinuationBox> = []

    private final class ContinuationBox: Hashable, Sendable {
        let value: AsyncStream<Int>.Continuation

        init(value: AsyncStream<Int>.Continuation) {
            self.value = value
        }

        static func == (lhs: Sender.ContinuationBox, rhs: Sender.ContinuationBox) -> Bool {
            lhs === rhs
        }

        func hash(into hasher: inout Hasher) {
            hasher.combine(ObjectIdentifier(self))
        }
    }
}

Note that capturing continuation or box in onTermination is safe, because onTermination is dropped after being called. And it is always called, even if AsyncStream is discarded without even being iterated.

8 Likes

I think this is a great addition as I ran into this problem when designing my networking library, but ultimately went with a different data structure and rigamarole to solve it. I also see future use cases where this solution would be optimal for everyone's usage.

There's a simpler solution which avoids needing to hash the continuation itself - count the number of times values is requested then associating that with the continuation in a map.

It can also be very useful when debugging.

@MainActor private class Sender {
    private(set) var count: UInt = 0
    var value: Int = 0 {
        didSet {
            for c in continuations { c.yield(value) }
        }
    }

    var values: some AsyncSequence<Int, Never> {
        AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
            continuation.yield(value)
            let id = count + 1
            count = id
            continuations[id] = continuation
            continuation.onTermination = { _ in
                Task { self.continuations.removeValue(forKey: id) }
            }
        }
    }

    private var continuations: [UInt: AsyncStream<Int>.Continuation] = [:]
}
1 Like

I think that is a fine addition if we can pull it off in an ABI stable manner, even though as pointed out you could use some sort of associated table entry id - I think that adding hashability has little cost per the implementation today and measurable gains for users.

1 Like

I feel quite the opposite in the solution you provided. Conforming to Hashable is much simpler than manually keeping track of and cleaning up continuations. Not to mention the added complexity (and performance overhead) with dictionary lookups and manipulation, dealing with heap objects, Sendability, and creating a new task. All this needs to be considered when designing a solution and a simple Hashable conformance would suffice for how this type should be used.

1 Like