What to expect from `AsyncSequence` cancellation?

I tend to write a task cancellation check inside the for loop as a safeguard, but can this ever theoretically happen? Usually my mental model is to check after every resumption from an await if the parent task got cancelled and then act in the best possible way, but async sequences are a bit special as they would usually nil out on cancellation in the event of the continuation waiting to resume.

func foo() async {
  ...
  for await element in nonThrowingAsyncSequence {
    if Task.isCancelled { 
      break // will I ever be able to enter this branch?
    }
  }
  // Or will any stream always `nil` out and break / complete?
  ...
}

My understanding is that if the async sequence is 'well behaved', your branch will likely never be called – but it's not impossible. The asynchronous sequence may emit an element before emitting its 'nil' termination element (on cancellation of the task), and then the branch would be called.

To be a well behaved asynchronous sequence wrt cancellation, all that's required is that the asynchronous sequence make 'best efforts' to shut things down as soon as possible, but some leeway is given to provide the sequence a chance to synchronise its behaviour, i.e. completing its current operation.

In this scenario, I probably wouldn't; do the check though, and simply wait for sequence to emit nil.

What @tcldr is correct. All AsyncSequences should adhere to cooperative cancellation. The ones implemented in the stdlib/async-algos/nio all wrap the continuations with cancellation handlers.

However, as @tcldr noted your branch might potentially get hit due to the fact that cancellation comes from another thread. I would recommend trusting that AsyncSequences are well behaved and will terminate your iteration as soon as possible. Checking cancellation is quite costly and doing that unconditionally in every iteration should be avoided

1 Like

I was actually curious about what happens if you nullify the class holding the streams continuation.

Looks like the loop terminates as long as you make sure to call continuation.finish in your deinit

func testAsyncLoopCancels() async throws {
    var test = Optional.some(AsyncStreamCancellationTestClass())

    let task1 = Task {
        var lastValue: Int?

        for await value in test!.stream {
            if Task.isCancelled {
                break // this block is never called
            }

            lastValue = value
        }

        return lastValue
    }

    try await Task.sleep(nanoseconds: 1_000_000_000)
    test?.yieldValue(value: 1)
    test = .none

    let value = await task1.value

    XCTAssertEqual(value, 1)
}

class AsyncStreamCancellationTestClass {
    private var continuation: AsyncStream<Int>.Continuation?

    var stream: AsyncStream<Int> {
        self.continuation?.finish()

        return AsyncStream { continuation in
            continuation.onTermination = { [weak self] _ in
                self?.continuation = nil
            }

            self.continuation = continuation
        }
    }

    func yieldValue(value: Int) {
        continuation?.yield(value)
    }

    deinit {
        continuation?.finish() // if you comment this out the test fails
    }
}

If you want to learn more around this topic and the (not) expected behaviors, I highly encourage you to try to scan this discussion here: AsyncSequences and cooperative task cancellation

One of the conclusions from it that I had, was that the question I asked in this thread presents a valid and expected pattern for early exiting the consuming stream / sequence.