I think you may be hitting a combination of three issues:
-
Task.isCancelled
is broken and incorrect in releases prior to Xcode 13.2b2. Makes it hard to check for cancellation while performing work. -
AsyncStream
itself doesn't properly manage its lifetime, so you may not be seeing the correct behavior. Whether that's actually affecting what you're doing here I don't know. This has been fixed, but it seems unlikely the fix will be released with Xcode 13.2 and may not be released until Swift 5.6 in the spring. -
AsyncStream
's builder is not anasync
context, and so doesn't inherit automatic cancellation (I think). I'm not sure if there's anywhere you can attach a separate cancellation handler to fix that though.
Fundamentally, you need some way to cancel your async work when the AsyncStream
completes or when iteration is stopped. You can do that by wrapping the stream in your own AsyncSequence
and use the lifetime of the iterator to trigger cleanup, as demonstrated in GRDB. I've built a slightly generalized version of the approach for Alamofire.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct StreamOf<Element>: AsyncSequence {
public typealias AsyncIterator = Iterator
public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
fileprivate typealias Continuation = AsyncStream<Element>.Continuation
private let bufferingPolicy: BufferingPolicy
private let onTermination: (() -> Void)?
private let builder: (Continuation) -> Void
fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
onTermination: (() -> Void)? = nil,
builder: @escaping (Continuation) -> Void) {
self.bufferingPolicy = bufferingPolicy
self.onTermination = onTermination
self.builder = builder
}
public func makeAsyncIterator() -> Iterator {
var continuation: AsyncStream<Element>.Continuation?
let stream = AsyncStream<Element> { innerContinuation in
continuation = innerContinuation
builder(innerContinuation)
}
return Iterator(iterator: stream.makeAsyncIterator()) {
continuation?.finish()
self.onTermination?()
}
}
public struct Iterator: AsyncIteratorProtocol {
private final class Token {
private let onDeinit: () -> Void
init(onDeinit: @escaping () -> Void) {
self.onDeinit = onDeinit
}
deinit {
onDeinit()
}
}
private var iterator: AsyncStream<Element>.AsyncIterator
private let token: Token
init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
self.iterator = iterator
token = Token(onDeinit: onCancellation)
}
public mutating func next() async -> Element? {
await iterator.next()
}
}
}
But in the end this is all based on experimentation; I'm still not certain how this is all supposed to work.