How do you completely cancel an AsyncStream?

In SE-0314: AsyncStream and AsyncThrowingStream, under the section Cancellation Handlers the following code is shown:

let t = detach {
  func make123Stream() -> AsyncStream<Int> {
    AsyncStream { continuation in
      continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("Regular finish")
        case .cancelled:
            print("Cancellation")
        }
      }
      detach {
        for n in 1...3 {
          continuation.yield(n)
          sleep(2)
        }
        continuation.finish()
      }
    }
  }

  for await n in make123Stream() {
    print("for-in: \(n)")
  }
  print("After")
}
sleep(3)
t.cancel()

// for-in: 1
// for-in: 2
// Cancellation
// After

// ... yield() is still being called

When .cancel() is called on the outermost Task t, the continuation's handler is correctly called, as expected. However, the nested, detached task will continue to call yield until its loop finishes.

The end result is that the "work" that the nested, detached task is performing will continue to be performed even though the parent task was cancelled. Inserting a call to Task.isCancelled within the for loop does not return true.

Is there a pattern here that one should use to propagate the cancellation downstream? I can achieve what I want by creating my own type that simply wraps an isCancelled flag and is accessible to the continuation's closure and the detached Task's closure, but is that the expected way?

Related discussion over here: AsyncStream and Cancellation

3 Likes

I think you may be hitting a combination of three issues:

  1. Task.isCancelled is broken and incorrect in releases prior to Xcode 13.2b2. Makes it hard to check for cancellation while performing work.
  2. AsyncStream itself doesn't properly manage its lifetime, so you may not be seeing the correct behavior. Whether that's actually affecting what you're doing here I don't know. This has been fixed, but it seems unlikely the fix will be released with Xcode 13.2 and may not be released until Swift 5.6 in the spring.
  3. AsyncStream's builder is not an async context, and so doesn't inherit automatic cancellation (I think). I'm not sure if there's anywhere you can attach a separate cancellation handler to fix that though.

Fundamentally, you need some way to cancel your async work when the AsyncStream completes or when iteration is stopped. You can do that by wrapping the stream in your own AsyncSequence and use the lifetime of the iterator to trigger cleanup, as demonstrated in GRDB. I've built a slightly generalized version of the approach for Alamofire.

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct StreamOf<Element>: AsyncSequence {
    public typealias AsyncIterator = Iterator
    public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
    fileprivate typealias Continuation = AsyncStream<Element>.Continuation

    private let bufferingPolicy: BufferingPolicy
    private let onTermination: (() -> Void)?
    private let builder: (Continuation) -> Void

    fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
                     onTermination: (() -> Void)? = nil,
                     builder: @escaping (Continuation) -> Void) {
        self.bufferingPolicy = bufferingPolicy
        self.onTermination = onTermination
        self.builder = builder
    }

    public func makeAsyncIterator() -> Iterator {
        var continuation: AsyncStream<Element>.Continuation?
        let stream = AsyncStream<Element> { innerContinuation in
            continuation = innerContinuation
            builder(innerContinuation)
        }

        return Iterator(iterator: stream.makeAsyncIterator()) {
            continuation?.finish()
            self.onTermination?()
        }
    }

    public struct Iterator: AsyncIteratorProtocol {
        private final class Token {
            private let onDeinit: () -> Void

            init(onDeinit: @escaping () -> Void) {
                self.onDeinit = onDeinit
            }

            deinit {
                onDeinit()
            }
        }

        private var iterator: AsyncStream<Element>.AsyncIterator
        private let token: Token

        init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
            self.iterator = iterator
            token = Token(onDeinit: onCancellation)
        }

        public mutating func next() async -> Element? {
            await iterator.next()
        }
    }
}

But in the end this is all based on experimentation; I'm still not certain how this is all supposed to work.

6 Likes

That's awesome, thanks for the insights Jon. Coming from GCD, I already had a very basic Canceller class that could be passed around to indicate if an operation should continue running or not. It's primitive, but works reliably. I can easily use that here since the continuation and nested task can both capture it.

I'll keep an eye on Swift 5.6 for changes. For my use cases, being able to quickly and reliably cancel tasks is critical to maintaining performance.

Hello!
It's quite buggy snippet:

  1. There is detached task that no-ones cancels. It should be cancelled in the termination handler

  2. There is should be await Task.sleep instead of sleep. I think Apple should add compilation error for usage sleep in the async environment. It's looking like all is working fine, but it's breaking all async/await paradigm.

2 Likes