How do you completely cancel an AsyncStream?

I think you may be hitting a combination of three issues:

  1. Task.isCancelled is broken and incorrect in releases prior to Xcode 13.2b2. Makes it hard to check for cancellation while performing work.
  2. AsyncStream itself doesn't properly manage its lifetime, so you may not be seeing the correct behavior. Whether that's actually affecting what you're doing here I don't know. This has been fixed, but it seems unlikely the fix will be released with Xcode 13.2 and may not be released until Swift 5.6 in the spring.
  3. AsyncStream's builder is not an async context, and so doesn't inherit automatic cancellation (I think). I'm not sure if there's anywhere you can attach a separate cancellation handler to fix that though.

Fundamentally, you need some way to cancel your async work when the AsyncStream completes or when iteration is stopped. You can do that by wrapping the stream in your own AsyncSequence and use the lifetime of the iterator to trigger cleanup, as demonstrated in GRDB. I've built a slightly generalized version of the approach for Alamofire.

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct StreamOf<Element>: AsyncSequence {
    public typealias AsyncIterator = Iterator
    public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
    fileprivate typealias Continuation = AsyncStream<Element>.Continuation

    private let bufferingPolicy: BufferingPolicy
    private let onTermination: (() -> Void)?
    private let builder: (Continuation) -> Void

    fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
                     onTermination: (() -> Void)? = nil,
                     builder: @escaping (Continuation) -> Void) {
        self.bufferingPolicy = bufferingPolicy
        self.onTermination = onTermination
        self.builder = builder
    }

    public func makeAsyncIterator() -> Iterator {
        var continuation: AsyncStream<Element>.Continuation?
        let stream = AsyncStream<Element> { innerContinuation in
            continuation = innerContinuation
            builder(innerContinuation)
        }

        return Iterator(iterator: stream.makeAsyncIterator()) {
            continuation?.finish()
            self.onTermination?()
        }
    }

    public struct Iterator: AsyncIteratorProtocol {
        private final class Token {
            private let onDeinit: () -> Void

            init(onDeinit: @escaping () -> Void) {
                self.onDeinit = onDeinit
            }

            deinit {
                onDeinit()
            }
        }

        private var iterator: AsyncStream<Element>.AsyncIterator
        private let token: Token

        init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
            self.iterator = iterator
            token = Token(onDeinit: onCancellation)
        }

        public mutating func next() async -> Element? {
            await iterator.next()
        }
    }
}

But in the end this is all based on experimentation; I'm still not certain how this is all supposed to work.

5 Likes