AsyncSequences and cooperative task cancellation

Correct. That's one of my points. The task cancellation is forwarded through the async sequence chain, but the consumer of the sequence can decide to break out from the consumption immediately after receiving the next value. That's a totally valid thing to do. And it finally would provide a clear answer to my previous question from this thread.

That should be fine as well. A task can only branch out if it uses child task creation via task groups or async let. Even if you an async method such as AsyncStream.Iterator.next() would capture one or multiple continuations from a parent task, we can still logically resolve and resume. With a single consumer, this is trivial as there won't be another call to next from within a loop body, hence there's no continuation captured by the algorithm. If we would create child tasks and each would capture and cache another continuation, then they will resolve in the undefined order of their capture while the producer kept emitting elements or if we had pending elements left. When the producer finished the stream / buffer and if we have still multiple continuations awaiting, we can resume them all by forwarding the termination (nil or an error).

:slight_smile: Happy to hear that. In my opinion AsyncSyncSequence should not cancel, as it's a transformation sequence type from sync into async and the sync sequence is its non-cancellable base. Thus it's also non cancellable. That would also indicate that we need an AsyncSequence type which would allow us to explicitly break / cancel from non-cancellable sequences chain when Task.isCancelled is detected.

[1, 2, 3].async.stopOnCancellation().map { "\($0)" } // will cancel immediately
[1, 2, 3].async.map { "\($0)" } // will emit "1", "2" and "3" and finish

I cannot agree more here! :+1:

We should double check all AsyncSequence types that we shipped for their behavior including the algorithms and re-evalutate where necessary.

I'm sorry for bringing this important topic up so late. :upside_down_face:


One more hypothetical example.

Let's conform Task<Success, Failure> to AsyncSequence (it would need typed throws, but let's ignore it for this example).

Task as an AsyncSequence would simply use withTaskCancellationHandler in it's Iterator.next() method and just call self.cancel() on cancellation. It will then cooperatively either throw or return. As Task as AsyncSequence can be used like Combine.Just or potentially Combine.Empty depending on the configuration of the generic type parameters.

extension Task: AsyncSequence where Failure == any Error {
  public typealias Element = Success

  public struct AsyncIterator: AsyncIteratorProtocol {
    let task: Task
    var finished: Bool = false

    public mutating func next() async throws -> Success? {
      guard !finished else {
        return nil
      }
      defer {
        finished = true
      }
      return try await withTaskCancellationHandler {
        try await task.value
      } onCancel: { [task] in
        task.cancel()
      }
    }
  }

  public func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator(task: self)
  }
}

Editing these things in as there's a three posts limit:

Why though? The debounce can tell the producer in the chain can you please stop producing as soon as possible, because I got cancelled. In other words it would decelerate. However the debounce does not have to debounce the finishing signal from the produce and immediately forward that by resuming the continuation (if it's based on one). That would be a cooperative transformation again.

-0---1-2-3---^4|--->
[ === debounce === ]
---0-------3-^--4|->
             ^ cooperative cancel signal

I cannot comment on merge as I haven't closely looked into it's implementation, but logically it should just forward the cancellation on all producers and wait for the last of them to stop to stop itself. This behavior would be cooperative.

---B--^|-------->
-A----^-A--|---->
[ === merge === ]
-A-B--^-A--|---->
      ^ cooperative cancel signal

In case of an upstream error:

---B-X--------->
-A---^--A-|---->
[ === merge === ]
     ^
     |
-A-B-x----X---->
__________________
-A-B------X----> // consumer

// if one upstream fails, the other one is cancelled cooperatively
// last A is discarded
// the first error is emitted when both upstreams terminate
// otherwise upstreams would outlive the `merge`-downstream

Using the non-throwing marble diagram and transforming one cancellation into CancellationError:

---B--^X-------->
-A----^^-A-|----> // second cancellation is a no-op
[ === merge === ]
       ^
       |
-A-B--^x---X---->
_________________
-A-B--^----X----> // consumer 
      ^ cooperative cancel signal

Let's think about cooperative task cancellation on context of AsyncSequence as a hopefully fast deceleration of all producers until they eventually stop and everything returns or throws an error. It's such a powerful ability for sequences that I haven't seen anywhere else. In fact this is exactly why I think that cooperative task cancellation can be used to build graceful shutdown of a process or even the entire application.

@main struct Application {
  static func main() async { ... }
}

// the OS will do this then
let task = Task {
  await Application.main()
}

// graceful shut down as it will recursively spread through the entire
// async task tree and sequences and signal and mark everything as cancelled
// the application code will then try its best to shut down as soon as possible
// perform last save operation where needed and stop
task.cancel()

// give it some short window
await task.value
// otherwise kill the process forcefully

Regular (forceful) cancellation cannot be graceful, but "cooperative" cancellation can!