let t = detach {
func make123Stream() -> AsyncStream<Int> {
AsyncStream { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("Regular finish")
case .cancelled:
print("Cancellation")
}
}
detach {
for n in 1...3 {
continuation.yield(n)
sleep(2)
}
continuation.finish()
}
}
}
for await n in make123Stream() {
print("for-in: \(n)")
}
print("After")
}
sleep(3)
t.cancel()
// for-in: 1
// for-in: 2
// Cancellation
// After
// ... yield() is still being called
When .cancel() is called on the outermost Task t, the continuation's handler is correctly called, as expected. However, the nested, detached task will continue to call yield until its loop finishes.
The end result is that the "work" that the nested, detached task is performing will continue to be performed even though the parent task was cancelled. Inserting a call to Task.isCancelled within the for loop does not return true.
Is there a pattern here that one should use to propagate the cancellation downstream? I can achieve what I want by creating my own type that simply wraps an isCancelled flag and is accessible to the continuation's closure and the detached Task's closure, but is that the expected way?
I think you may be hitting a combination of three issues:
Task.isCancelled is broken and incorrect in releases prior to Xcode 13.2b2. Makes it hard to check for cancellation while performing work.
AsyncStream itself doesn't properly manage its lifetime, so you may not be seeing the correct behavior. Whether that's actually affecting what you're doing here I don't know. This has been fixed, but it seems unlikely the fix will be released with Xcode 13.2 and may not be released until Swift 5.6 in the spring.
AsyncStream's builder is not an async context, and so doesn't inherit automatic cancellation (I think). I'm not sure if there's anywhere you can attach a separate cancellation handler to fix that though.
Fundamentally, you need some way to cancel your async work when the AsyncStream completes or when iteration is stopped. You can do that by wrapping the stream in your own AsyncSequence and use the lifetime of the iterator to trigger cleanup, as demonstrated in GRDB. I've built a slightly generalized version of the approach for Alamofire.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct StreamOf<Element>: AsyncSequence {
public typealias AsyncIterator = Iterator
public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
fileprivate typealias Continuation = AsyncStream<Element>.Continuation
private let bufferingPolicy: BufferingPolicy
private let onTermination: (() -> Void)?
private let builder: (Continuation) -> Void
fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
onTermination: (() -> Void)? = nil,
builder: @escaping (Continuation) -> Void) {
self.bufferingPolicy = bufferingPolicy
self.onTermination = onTermination
self.builder = builder
}
public func makeAsyncIterator() -> Iterator {
var continuation: AsyncStream<Element>.Continuation?
let stream = AsyncStream<Element> { innerContinuation in
continuation = innerContinuation
builder(innerContinuation)
}
return Iterator(iterator: stream.makeAsyncIterator()) {
continuation?.finish()
self.onTermination?()
}
}
public struct Iterator: AsyncIteratorProtocol {
private final class Token {
private let onDeinit: () -> Void
init(onDeinit: @escaping () -> Void) {
self.onDeinit = onDeinit
}
deinit {
onDeinit()
}
}
private var iterator: AsyncStream<Element>.AsyncIterator
private let token: Token
init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
self.iterator = iterator
token = Token(onDeinit: onCancellation)
}
public mutating func next() async -> Element? {
await iterator.next()
}
}
}
But in the end this is all based on experimentation; I'm still not certain how this is all supposed to work.
That's awesome, thanks for the insights Jon. Coming from GCD, I already had a very basic Canceller class that could be passed around to indicate if an operation should continue running or not. It's primitive, but works reliably. I can easily use that here since the continuation and nested task can both capture it.
I'll keep an eye on Swift 5.6 for changes. For my use cases, being able to quickly and reliably cancel tasks is critical to maintaining performance.
There is detached task that no-ones cancels. It should be cancelled in the termination handler
There is should be await Task.sleep instead of sleep. I think Apple should add compilation error for usage sleep in the async environment. It's looking like all is working fine, but it's breaking all async/await paradigm.