AsyncSequences and cooperative task cancellation

I think the key phrase here is 'as soon as possible'. I do think that this should be on the programmer, the end user of AsynSequence pipelines, to decide and implement that 'as soon as possible'.

As the documentation for Task states:

Tasks include a shared mechanism for indicating cancellation, but not a shared implementation for how to handle cancellation. Depending on the work you’re doing in the task, the correct way to stop that work varies. Likewise, it’s the responsibility of the code running as part of the task to check for cancellation whenever stopping is appropriate.

In that spirit, I feel that the plumbing provided for structured concurrency (via AsyncSeqeunce) should defer responsibility for cancellation to the programmer in their concrete use case wherever and whenever that's feasible.

It does place an additional burden of responsibility on the documentation to clearly state what behaviour any one sequence has in regards to cancellation, but it seems a worthwhile cost and consideration.

Many AsyncSequences behave correctly anyway (filter, map) so it's just something additional to consider when creating new algorithms, and some tweaks/documentation for existing ones.

2 Likes

I agree with this. We should leave it to the consumer of an AsyncSequence to handle cancellation as well, e.g. they can check Task.isCancelled on every element the sequence produced and decide to terminate:

for await element in await someSequence {
  if Task.isCanclled { return }
}

However, there is one important point in time where the implementation of AsyncSequences MUST handle cancellation and that is when they take a continuation. The reason for this is that at that point in time the consumer can't handle it on their own since they are still waiting for the next element.

A concrete example with AsyncStream:

P = Producer
C = Consumer

P yields "a"
P yields "b"
C awaits `next` -> consumes "a"
C tasks gets cancelled
C awaits `next` -> consumes "b" // This still gets returned since it was buffered up
C awaits `next` -> consumes `nil` since nothing is buffered and the task was cancelled

I also agree that the AsyncSyncSequences cancellation behaviour should be re-discussed. Currently, it checks for cancellation on every next() call. Furthermore, I think we should also better document these behaviours in both a general rule set document for AsyncSequences and on the individual AsyncSequences themselves.

2 Likes

I can see that will be the case in many instances, but I'm not sure it will be the case in all instances. But I could wrong.

Here's an example I'm thinking of from the top of my head: say you take a continuation from the consuming Task in your sequence, and then you spawn another detached Task which is managing pulling elements from some upstream sequence, rather than terminate the sequence immediately when you get a cancellation signal via withTaskCancellationHandler, you could simply forward the cancellation responsibility by calling cancel on the detached Task you spawned. The hope being that the upstream would then terminate gracefully and finally emit a nil value (allowing you to finally resume the stashed continuation), and cleanly finish the async sequence mindfully of cooperative cancellation.

1 Like

I'm happy to see that I was able to spark some re-consideration, or at least some thinking on that particular topic. Cooperative task cancellation is so powerful, and AsyncSequence's should really utilize this to the fullest. This gives them a superior advantage compared to other reactive frameworks, at least those that I ever worked with.

AsyncStream is slightly a strange fella. I would strongly suggest that Async[Throwing]Stream is a buffering wrapper around some producer logic and that the producer logic should be considered as a base sequence generator, just like it's done with map and filter sequences. Thus the cooperative cancellation is signaled to the producer and it will decide how to properly finish the stream 'as soon as possible'.

P = Producer
S = Stream / Buffer
C = Consumer

P yields "a"
[Change] S buffers "a"
P yields "b"
[Change] S buffers "b"
C awaits `next` -> consumes "a"
C tasks gets cancelled
[Change] C forwards cancellation signal to P through S 
[Change] P receives cancellation signal and decides to finish S
C awaits `next` -> consumes "b" // This still gets returned since it was buffered up
[Change] P finishes
[Change] S finishes
C awaits `next` -> consumes `nil` since nothing is buffered and the task was cancelled, which caused P to finish S

That example is basically exactly how I was able to workaround the current Async[Throwing]Stream behavior. I had to detach the call to next from the current task. If the parent get's cancelled I forward the signal through the stream's continuation's onTerminate handler, which reaches such a Task that drives the stream. It gracefully shuts down and finishes the stream, which results into nil or an error arriving at the consumer. See the implementation of AsyncCooperativeStream above.

It does respect:

  • cooperative task cancellation (as soon as possible)
  • back pressure / pull based approach
1 Like

Correct. That's one of my points. The task cancellation is forwarded through the async sequence chain, but the consumer of the sequence can decide to break out from the consumption immediately after receiving the next value. That's a totally valid thing to do. And it finally would provide a clear answer to my previous question from this thread.

That should be fine as well. A task can only branch out if it uses child task creation via task groups or async let. Even if you an async method such as AsyncStream.Iterator.next() would capture one or multiple continuations from a parent task, we can still logically resolve and resume. With a single consumer, this is trivial as there won't be another call to next from within a loop body, hence there's no continuation captured by the algorithm. If we would create child tasks and each would capture and cache another continuation, then they will resolve in the undefined order of their capture while the producer kept emitting elements or if we had pending elements left. When the producer finished the stream / buffer and if we have still multiple continuations awaiting, we can resume them all by forwarding the termination (nil or an error).

:slight_smile: Happy to hear that. In my opinion AsyncSyncSequence should not cancel, as it's a transformation sequence type from sync into async and the sync sequence is its non-cancellable base. Thus it's also non cancellable. That would also indicate that we need an AsyncSequence type which would allow us to explicitly break / cancel from non-cancellable sequences chain when Task.isCancelled is detected.

[1, 2, 3].async.stopOnCancellation().map { "\($0)" } // will cancel immediately
[1, 2, 3].async.map { "\($0)" } // will emit "1", "2" and "3" and finish

I cannot agree more here! :+1:

We should double check all AsyncSequence types that we shipped for their behavior including the algorithms and re-evalutate where necessary.

I'm sorry for bringing this important topic up so late. :upside_down_face:


One more hypothetical example.

Let's conform Task<Success, Failure> to AsyncSequence (it would need typed throws, but let's ignore it for this example).

Task as an AsyncSequence would simply use withTaskCancellationHandler in it's Iterator.next() method and just call self.cancel() on cancellation. It will then cooperatively either throw or return. As Task as AsyncSequence can be used like Combine.Just or potentially Combine.Empty depending on the configuration of the generic type parameters.

extension Task: AsyncSequence where Failure == any Error {
  public typealias Element = Success

  public struct AsyncIterator: AsyncIteratorProtocol {
    let task: Task
    var finished: Bool = false

    public mutating func next() async throws -> Success? {
      guard !finished else {
        return nil
      }
      defer {
        finished = true
      }
      return try await withTaskCancellationHandler {
        try await task.value
      } onCancel: { [task] in
        task.cancel()
      }
    }
  }

  public func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator(task: self)
  }
}

Editing these things in as there's a three posts limit:

Why though? The debounce can tell the producer in the chain can you please stop producing as soon as possible, because I got cancelled. In other words it would decelerate. However the debounce does not have to debounce the finishing signal from the produce and immediately forward that by resuming the continuation (if it's based on one). That would be a cooperative transformation again.

-0---1-2-3---^4|--->
[ === debounce === ]
---0-------3-^--4|->
             ^ cooperative cancel signal

I cannot comment on merge as I haven't closely looked into it's implementation, but logically it should just forward the cancellation on all producers and wait for the last of them to stop to stop itself. This behavior would be cooperative.

---B--^|-------->
-A----^-A--|---->
[ === merge === ]
-A-B--^-A--|---->
      ^ cooperative cancel signal

In case of an upstream error:

---B-X--------->
-A---^--A-|---->
[ === merge === ]
     ^
     |
-A-B-x----X---->
__________________
-A-B------X----> // consumer

// if one upstream fails, the other one is cancelled cooperatively
// last A is discarded
// the first error is emitted when both upstreams terminate
// otherwise upstreams would outlive the `merge`-downstream

Using the non-throwing marble diagram and transforming one cancellation into CancellationError:

---B--^X-------->
-A----^^-A-|----> // second cancellation is a no-op
[ === merge === ]
       ^
       |
-A-B--^x---X---->
_________________
-A-B--^----X----> // consumer 
      ^ cooperative cancel signal

Let's think about cooperative task cancellation on context of AsyncSequence as a hopefully fast deceleration of all producers until they eventually stop and everything returns or throws an error. It's such a powerful ability for sequences that I haven't seen anywhere else. In fact this is exactly why I think that cooperative task cancellation can be used to build graceful shutdown of a process or even the entire application.

@main struct Application {
  static func main() async { ... }
}

// the OS will do this then
let task = Task {
  await Application.main()
}

// graceful shut down as it will recursively spread through the entire
// async task tree and sequences and signal and mark everything as cancelled
// the application code will then try its best to shut down as soon as possible
// perform last save operation where needed and stop
task.cancel()

// give it some short window
await task.value
// otherwise kill the process forcefully

Regular (forceful) cancellation cannot be graceful, but "cooperative" cancellation can!

Minor correction: that is actually as described more like Combine.Future. Just and Empty are isomorphisms to Publishers.Sequence with either 1 value or 0 values respectively.

Just so I understand correctly - the first part of your ask is that the rules for AsyncSequence implementors for algorithms that derive the values from another AsyncSequence (i.e. .debounce .map et al) never resume the continuation on cancellation but instead rely on the forwarding (or just presence of) the task's cancelled state.

For example: an AsyncSequence that checks for cancellation every 1000 elements produced that is then connected to a debounce would only produce a terminal event if the base (checking every 1000 elements) AsyncSequence produces a terminal.

More concretely; withCancellationHandler's onCancel should only be used to forward cancellation to a pseudo child task and never resume a continuation.

This logic seems reasonable and relaxes the state machine requirements for a few algorithms. To be clear: even resuming that continuation is still cooperative, I would just classify it as an eager system versus a lazy cooperative cancellation system.

However I don't follow how this applies to AsyncSyncSequence. Sequence itself has no understanding of cancellation at all. So couldn't you easily get into problematic scenarios where you have indefinite Sequence definitions that are made to be constant producers of values via .async transforming the Sequence into an AsyncSequence?

e.g.

struct Indefinite<Element>: Sequence, IteratorProtocol {
  var value: Element
  func next() -> Element? { return value }
  init(_ value: Element) { self.value = value }
}

for await item in Indefinite("hello").async {
  await doStuff(with: item)
}

I have seen plenty of use cases in similar systems for this type of thing. Making it "just work" with cooperative cancellation to me makes sense and having it not play nicely with cooperative cancellation seems more like a bug (or at the least inconsistent)

1 Like

That’s a fair point, but I think the same would apply to a non async sequence. You'd have an infinite non-async for-in loop unless you break out of it conditionally.

I think the trick here would be to use a map and for the end user/programmer to handle cancellation in the closure provided to map as they see fit – or at the point of consumption of course.

I'm not sure how you can terminate in map other than throwing. Is that what you are meaning?

From a perf standpoint: since map is a value transform it is really performance sensitive - ideally it should defer cancellation to surrounding stuff. Checking cancellation on each pass is potentially kinda expensive.

UPDATE: Yeah, you're right. You can't use map in its existing form, you'd need an alternative algorithm. Something like:

func terminatingMap<Transformed>(_ transform: @escaping (Self.Element) async throws -> Transformed?) -> AsyncTerminatingMapSequence<Self, Transformed>

This would terminate the sequence if the closure returns a nil value.

Let's say you want to guarantee that an async sequence of fibonacci numbers delivers the sequence to at least 21.

By using map terminatingMap, i'm imagining:

// not tested, but hopefully illustrative
let fibonacci = FibonacciSequence()
  .async
  .terminatingMap { i in
    if Task.isCancelled && i > 21 { return nil }
    return i
  }

for await item in fibonacci {
  await doStuff(with: item)
}

Or simply at the point of consumption:

for await item in FibonacciSequence().async {
  if Task.isCancelled && item > 21 { break }
  await doStuff(with: item)
}

From a perf standpoint: since map is a value transform it is really performance sensitive - ideally it should defer cancellation to surrounding stuff. Checking cancellation on each pass is potentially kinda expensive.

Huh, interesting. Maybe there's room for algorithms to help with cancellation in this regard that doesn't occur an unnecessary transform.

EDIT: Actually, looking further I don't think the existing map could be used this way so it would need to be a specific algorithm that aids with cancellation in some way. I've updated the example with a rough idea using the working title terminatingMap.

I see, thank you for the clarification. I was thinking here about specifically constrained Tasks like Task<Success, Never> which will yield one value and then finish with nil, or possibly Task<Never, Never>, which immediately returns nil or possibly performs Task.yield() and then returns nil.

That looks correct to me. The downstream terminates after the upstream. See my pseudo marble diagrams where we have to introduce a cooperative task cancellation that propagates from the consumer through the async sequence chain up to the sequence that can best possibly handle the cancellation.

That sounds right, but it depends on the given algorithm.

This again depends on the algorithm. Immediately resuming the continuation seems like it would be equivalent to a mixed concurrency model, where on cancelation of a structured task, an unstructured (pseudo) sub-task is cancelled, but not awaited. Therefore the pseudo sub-task could potentially outlive the structured one. This currently is a real possibility with an Async[Throwing]Stream wrapping a cooperative Task or an RxSwift.Observable type. Being fully cooperative per Swift’s structured concurrency documentation I could find means that the child task‘s cancellation cannot outlive the parent task.

That‘s exactly the point. The base, the sync Sequence, is not cancellable, and AsyncSyncSequence is just a transformational wrapper sequence that bridges the sync and async worlds. A transformational async sequence should rely its cancellation based on the base sequence. Thus it‘s also non cancellable.

As Franz and I mentioned somewhere upthread, it‘s up to the consumer or the chain constructor to ensure that a non-cancellable sequence is broken up during cancellation in the most appropriate way possible, if at all.

let sequence1 = infiniteSyncSequence
  .stopOnCancellation() // introduce a special modifier sequence for chains
  .map { … }
  .filter { … }

let sequence2 = infiniteSyncSequence.map { … }.filter { … }

for await element in sequence2 {
  If Task.isCancelled { break } // the consumer can do it here
}

Both options are almost the same, they just differ in points of time where such an infinite chain is broken or stopped from being consumed.

As discussed upthread each sequence type and the modifier would / should receive documentation for its exact behavior. Therefore your Indefinite would propagate the non-cancelability through AsyncSyncSequence.

If on the other hand we would make AsyncSyncSequence cancellable, as it was already implemented, then we might unexpectedly drop elements from the base sequence and the user would need to create an own type to workaround this behavior.

With a hypothetical generator function it could look somewhat like this:

func `async`<T: Sequence>(_ sequence: T) -> stream T.Element {
  for element in sequence {
    yield element
  }
}

Such a transforming generator straight ignores any cancellation as it has to replay the entire sequence. If cancellation is needed in such case, it must be introduced down the async sequence chain or on the consumer‘s end.

Sharing feedback from the experience since I moved all my stream types to the custom wrapper which made it fully cooperative. The logic I have been working on was designed to be cooperative in every corner. I discovered quite a few tiny bugs and mistakes in my algorithms. As soon as those were fixed the system worked really nicely and everything acts predictively sequentially and more importantly gracefully.

1 Like

This is where I kinda differ on opinions: I feel that any AsyncSequence that does not respond to cancellation in any form is breaking expectations. The cooperative term in "cooperative cancellation" means (for AsyncSequence): if the task iterating the async sequence is cancelled the sequence will eventually terminate, either with a nil from the iterator's next or by throwing an error.

I can accept that we perhaps shouldn't eagerly terminate - but ignoring cancel seems a step too far.

@DevAndArtist perhaps it might be a good contribution to submit some additional tests to validate using the marble diagrams in swift-async-algiorithms (it has symbols for cancellations and terminal expectations). That way we can understand the scope of what the eager -> lazy set of changes would be.

In this particular example it's not really ignoring the cancellation. The cancellation just falls out.

Take the simple AsyncSequence conforming Task above as an example. You can write, that it eventually terminates, but not during a cancellation signal. This can still be some design decision for that root sequence.

let base = Task {
  let actor: SomeActor = ...
  while await actor.condition {
    if Task.isCancelled { print("sorry can't terminate yet") }
    await Task.yield()
  }
  return "swift"
}

// parent task
let task = Task {
  let sequence = base.map(\.count) // transformation via `map`

  for await count in sequence { // consumer
    print("future word count", count)
  }
}
// later in time
task.cancel()
  • In this example base is the root producer which does not respond with a termination during a detected cancellation signal and this is totally legal and still cooperative.
  • map wraps the future value and transforms it, but it does not eagerly return nil itself on cancellation, instead it delegates that responsibility to its Base. In this case it will inherit the non-cancelability from base and act the same way.

What does AsyncSyncSequence type do?

Well to me, it's a construct which provides a transformation operator async, just like map or filter are one. Therefore this sequence must respect the cooperative yielding of values from its base upstream. In that special case the base upstream is a plain Sequence which has no notion of cancellation and which itself should never stop emitting the values for such an event. That responsibility moves to the chain construction or the end consumer. Therefore it's really trivial that async operator should not eagerly add cancelation to the chain.

If one need immediate break during cancellation somewhere in the chain, even if the chain could gracefully shut down itself in a short time, we could still provide a special async sequence type that would provide that eager capability.

// straw man operator name `eagerlyCancel()`
// will cancel immediately when it detects `Task.isCancelled` during `next()`
[1, 2, 3].async.eagerlyCancel().map { "\($0)" } 

// will emit "1", "2" and "3" and finish
[1, 2, 3].async.map { "\($0)" } 

To further amplify this. An AsyncSequence should react to a cancellation on the best possible way (that can be delegating it to the upstream(s)) and terminate as soon as feasibly possible, BUT it does not have to terminate eagerly and immediately. Therefore the following example sequence is 100% valid and can be projected to the point I'm trying to make regarding AsyncSyncSequence.

struct AsyncOneTwoThree: AsyncSequence {
  typealias Element = Int

  struct AsyncIterator: AsyncIteratorProtocol {
    var array = [1, 2, 3]

    mutating func next() async throws -> Element? {
      if !array.isEmpty {
        return array.removeFirst()
      }
      return nil
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator()
  }
}

I will try to take a look at that in the following days (hopefully) and see how I can help here. We should also double check async sequences from the Concurrency framework and what we should do if there were some types that shouldn't be that eager (e.g. Async[Throwing]Stream).

If that was not enough or convincing, here's the same sequence, but broken up in pieces:

func array_async_eagerlyCancel_map() {
  let array = ["swift"]

  var iterator = (array.makeIterator()) // pseudo `async` wrapping

  repeat {
    if Task.isCancelled {   //
      break // return `nil` // `eagerlyCancel()`
    }                       //

    guard let element = iterator.next() else { //
      break // return `nil`                    // pull from base / upstream
    }                                          //

    let count = element.count // `map` operator

    print(count) // simulated consumer

    break
  } while true
}

The cancellation flows from the consumer some upstream, that responds to the cancellation. If no upstream responds to it, then there shouldn't be any eagerly performed. In this example we explicitly set an middle operator to perform exactly that operation right before the map though.

For me, there's a number of reasons I believe that AsyncSyncSequence should not attempt to eagerly cancel itself:

  • An asynchronous sequence that eagerly cancels itself is impossible to decompose into one that does not. However, it is trivial to add cancellation to a non-cancelling sequence. Either at the point of consumption or via an operator. This is similar to the arguments for ensuring that asynchronous sequences are back pressure supporting by default. An asynchronous sequence that absorbs back pressure is likewise impossible to decompose into one that does.
  • Non-async Sequences are naturally less resilient to being torn. Many Sequences adapted to be asynchronous won't be indeterminate like the example above. Many will have been designed to be non-reduceable units of data for which unexpected truncation could lead to all sorts of unforeseen issues.
  • Most non-async Sequence's finish immediately With the exception of indeterminate sequences which require special handling regardless of whether they are sync or async, a non-async sequence delivers all its elements and then finishes. Indeterminate sequences are the special case here, and like the handling of their sync counterparts, it shouldn't be unexpected that they require special care in their handling.

Finally, wrt to this breaking expectations:

While there's some areas of the existing Rx model that I think we're missing, this is one area where I strongly feel that Swift's asynchronous sequences can (and should) break away from the Rx model. I know there was discussion that asynchronous sequences shouldn't take on what other reactive programming frameworks do without considering if there was a better way, I definitely feel this is one of those situations.

There is already some effort being expended on educating people that it is their responsibility fulfil cooperative cancellation with Tasks (see the docs for Task), extending that to AsyncSequence is, in my mind, the natural extension of that. Id argue, it's only those with prior Rx experience that would require readjustment of expectations.

1 Like

To draw one last parallel between regular async functions and AsyncSequence is this simple example:

func noop() async { return } // async, cooperative, no cancellation

struct Noop: AsyncSequence {
  typealias Element = Never

  init() {}

  struct AsyncIterator: AsyncIteratorProtocol {
    func next() async -> Element? {
      nil
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator()
  }
}

// hypothetical generator function
func noop() -> stream Never { return }

Even though it's a root AsyncSequence type, it does not have any cancellation requirements. It just finishes right away. Via cooperative cancellation is the termination through nil or a thrown error is a possibility for an AsyncSequence (via a retuned value or an error for a task) but by no means a forceful / eager requirement.

1 Like

I think we are going in circles here. There are some points we all agree on:

  1. Algorithms should forward cancellation to the upstream instead of handling it on their own
  2. We should improve documentation around cancellation behaviours of the various roots and in general some documentation about expectations on AsyncSequences
  3. We should revisit the AsyncStream implementation. At least to align the throwing and non-throwing one.

Now to the point where we are still split:

Is cancellation graceful or forceful? From how the language is working and how it is implemented in most APIs cancellation is forceful. It is not a signal of "please stop whenever it makes sense" but rather "please stop as soon as possible". That means also any root AsyncSequence that might be infinite or doesn't know when the next element is going to be produced must check for cancellation. I agree with @Philippe_Hausler here that AsyncSyncSequence can contain infinite sequences and therefor must check for cancellation eagerly in each next() call.

IMO, as soon as you stop treating task cancellation as something graceful and expect that you cannot make any assumption of what arrives or doesn't when you get cancelled your program becomes way more robust. If you want to signal a task that it should gracefully stop its work then you should build a separate mechanism to propagate that signal. This could done by just passing in another AsyncSequence that you combine with your other one.

Unfortunately I cannot agree with this statement. This basically tries to enforce more opinionated rules to AsyncSequence types which diverge significantly from regular cooperative task cancellation on other async constructs. If that was forceful for tasks then they would eagerly kill child tasks and detach from their results instantly. That behavior isn't very useful nor flexible and I'm glad that we don't have this. The only difference from AsyncSequence to regular async functions is that sequences yield 0 to n values and finish / terminate, but all of them can be traced back to a single async next function. Therefore that particular next function should be capable to do exactly the same cooperative work on cancellation as a plain async function, nothing more, nothing less. In your description it would however mean that we would limit that behavior by enforcing more rules, just because this is not a construct that either returns or throws.

Cooperative cancellation really means: "please stop as soon as possible, but really do whatever".

IMO cancellation is the same for a Task as it is for an AsyncSequence at some point somewhere you take a continuation and that point needs to handle cancellation. As you said next() is just an async method after all.

The "do whatever" is not true. Cooperative is "please stop as possible" since you just got cancelled.