AsyncSequences and cooperative task cancellation

Correct. That's one of my points. The task cancellation is forwarded through the async sequence chain, but the consumer of the sequence can decide to break out from the consumption immediately after receiving the next value. That's a totally valid thing to do. And it finally would provide a clear answer to my previous question from this thread.

That should be fine as well. A task can only branch out if it uses child task creation via task groups or async let. Even if you an async method such as AsyncStream.Iterator.next() would capture one or multiple continuations from a parent task, we can still logically resolve and resume. With a single consumer, this is trivial as there won't be another call to next from within a loop body, hence there's no continuation captured by the algorithm. If we would create child tasks and each would capture and cache another continuation, then they will resolve in the undefined order of their capture while the producer kept emitting elements or if we had pending elements left. When the producer finished the stream / buffer and if we have still multiple continuations awaiting, we can resume them all by forwarding the termination (nil or an error).

:slight_smile: Happy to hear that. In my opinion AsyncSyncSequence should not cancel, as it's a transformation sequence type from sync into async and the sync sequence is its non-cancellable base. Thus it's also non cancellable. That would also indicate that we need an AsyncSequence type which would allow us to explicitly break / cancel from non-cancellable sequences chain when Task.isCancelled is detected.

[1, 2, 3].async.stopOnCancellation().map { "\($0)" } // will cancel immediately
[1, 2, 3].async.map { "\($0)" } // will emit "1", "2" and "3" and finish

I cannot agree more here! :+1:

We should double check all AsyncSequence types that we shipped for their behavior including the algorithms and re-evalutate where necessary.

I'm sorry for bringing this important topic up so late. :upside_down_face:


One more hypothetical example.

Let's conform Task<Success, Failure> to AsyncSequence (it would need typed throws, but let's ignore it for this example).

Task as an AsyncSequence would simply use withTaskCancellationHandler in it's Iterator.next() method and just call self.cancel() on cancellation. It will then cooperatively either throw or return. As Task as AsyncSequence can be used like Combine.Just or potentially Combine.Empty depending on the configuration of the generic type parameters.

extension Task: AsyncSequence where Failure == any Error {
  public typealias Element = Success

  public struct AsyncIterator: AsyncIteratorProtocol {
    let task: Task
    var finished: Bool = false

    public mutating func next() async throws -> Success? {
      guard !finished else {
        return nil
      }
      defer {
        finished = true
      }
      return try await withTaskCancellationHandler {
        try await task.value
      } onCancel: { [task] in
        task.cancel()
      }
    }
  }

  public func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator(task: self)
  }
}

Editing these things in as there's a three posts limit:

Why though? The debounce can tell the producer in the chain can you please stop producing as soon as possible, because I got cancelled. In other words it would decelerate. However the debounce does not have to debounce the finishing signal from the produce and immediately forward that by resuming the continuation (if it's based on one). That would be a cooperative transformation again.

-0---1-2-3---^4|--->
[ === debounce === ]
---0-------3-^--4|->
             ^ cooperative cancel signal

I cannot comment on merge as I haven't closely looked into it's implementation, but logically it should just forward the cancellation on all producers and wait for the last of them to stop to stop itself. This behavior would be cooperative.

---B--^|-------->
-A----^-A--|---->
[ === merge === ]
-A-B--^-A--|---->
      ^ cooperative cancel signal

In case of an upstream error:

---B-X--------->
-A---^--A-|---->
[ === merge === ]
     ^
     |
-A-B-x----X---->
__________________
-A-B------X----> // consumer

// if one upstream fails, the other one is cancelled cooperatively
// last A is discarded
// the first error is emitted when both upstreams terminate
// otherwise upstreams would outlive the `merge`-downstream

Using the non-throwing marble diagram and transforming one cancellation into CancellationError:

---B--^X-------->
-A----^^-A-|----> // second cancellation is a no-op
[ === merge === ]
       ^
       |
-A-B--^x---X---->
_________________
-A-B--^----X----> // consumer 
      ^ cooperative cancel signal

Let's think about cooperative task cancellation on context of AsyncSequence as a hopefully fast deceleration of all producers until they eventually stop and everything returns or throws an error. It's such a powerful ability for sequences that I haven't seen anywhere else. In fact this is exactly why I think that cooperative task cancellation can be used to build graceful shutdown of a process or even the entire application.

@main struct Application {
  static func main() async { ... }
}

// the OS will do this then
let task = Task {
  await Application.main()
}

// graceful shut down as it will recursively spread through the entire
// async task tree and sequences and signal and mark everything as cancelled
// the application code will then try its best to shut down as soon as possible
// perform last save operation where needed and stop
task.cancel()

// give it some short window
await task.value
// otherwise kill the process forcefully

Regular (forceful) cancellation cannot be graceful, but "cooperative" cancellation can!

Minor correction: that is actually as described more like Combine.Future. Just and Empty are isomorphisms to Publishers.Sequence with either 1 value or 0 values respectively.

Just so I understand correctly - the first part of your ask is that the rules for AsyncSequence implementors for algorithms that derive the values from another AsyncSequence (i.e. .debounce .map et al) never resume the continuation on cancellation but instead rely on the forwarding (or just presence of) the task's cancelled state.

For example: an AsyncSequence that checks for cancellation every 1000 elements produced that is then connected to a debounce would only produce a terminal event if the base (checking every 1000 elements) AsyncSequence produces a terminal.

More concretely; withCancellationHandler's onCancel should only be used to forward cancellation to a pseudo child task and never resume a continuation.

This logic seems reasonable and relaxes the state machine requirements for a few algorithms. To be clear: even resuming that continuation is still cooperative, I would just classify it as an eager system versus a lazy cooperative cancellation system.

However I don't follow how this applies to AsyncSyncSequence. Sequence itself has no understanding of cancellation at all. So couldn't you easily get into problematic scenarios where you have indefinite Sequence definitions that are made to be constant producers of values via .async transforming the Sequence into an AsyncSequence?

e.g.

struct Indefinite<Element>: Sequence, IteratorProtocol {
  var value: Element
  func next() -> Element? { return value }
  init(_ value: Element) { self.value = value }
}

for await item in Indefinite("hello").async {
  await doStuff(with: item)
}

I have seen plenty of use cases in similar systems for this type of thing. Making it "just work" with cooperative cancellation to me makes sense and having it not play nicely with cooperative cancellation seems more like a bug (or at the least inconsistent)

1 Like

That’s a fair point, but I think the same would apply to a non async sequence. You'd have an infinite non-async for-in loop unless you break out of it conditionally.

I think the trick here would be to use a map and for the end user/programmer to handle cancellation in the closure provided to map as they see fit – or at the point of consumption of course.

I'm not sure how you can terminate in map other than throwing. Is that what you are meaning?

From a perf standpoint: since map is a value transform it is really performance sensitive - ideally it should defer cancellation to surrounding stuff. Checking cancellation on each pass is potentially kinda expensive.

UPDATE: Yeah, you're right. You can't use map in its existing form, you'd need an alternative algorithm. Something like:

func terminatingMap<Transformed>(_ transform: @escaping (Self.Element) async throws -> Transformed?) -> AsyncTerminatingMapSequence<Self, Transformed>

This would terminate the sequence if the closure returns a nil value.

Let's say you want to guarantee that an async sequence of fibonacci numbers delivers the sequence to at least 21.

By using map terminatingMap, i'm imagining:

// not tested, but hopefully illustrative
let fibonacci = FibonacciSequence()
  .async
  .terminatingMap { i in
    if Task.isCancelled && i > 21 { return nil }
    return i
  }

for await item in fibonacci {
  await doStuff(with: item)
}

Or simply at the point of consumption:

for await item in FibonacciSequence().async {
  if Task.isCancelled && item > 21 { break }
  await doStuff(with: item)
}

From a perf standpoint: since map is a value transform it is really performance sensitive - ideally it should defer cancellation to surrounding stuff. Checking cancellation on each pass is potentially kinda expensive.

Huh, interesting. Maybe there's room for algorithms to help with cancellation in this regard that doesn't occur an unnecessary transform.

EDIT: Actually, looking further I don't think the existing map could be used this way so it would need to be a specific algorithm that aids with cancellation in some way. I've updated the example with a rough idea using the working title terminatingMap.

I see, thank you for the clarification. I was thinking here about specifically constrained Tasks like Task<Success, Never> which will yield one value and then finish with nil, or possibly Task<Never, Never>, which immediately returns nil or possibly performs Task.yield() and then returns nil.

That looks correct to me. The downstream terminates after the upstream. See my pseudo marble diagrams where we have to introduce a cooperative task cancellation that propagates from the consumer through the async sequence chain up to the sequence that can best possibly handle the cancellation.

That sounds right, but it depends on the given algorithm.

This again depends on the algorithm. Immediately resuming the continuation seems like it would be equivalent to a mixed concurrency model, where on cancelation of a structured task, an unstructured (pseudo) sub-task is cancelled, but not awaited. Therefore the pseudo sub-task could potentially outlive the structured one. This currently is a real possibility with an Async[Throwing]Stream wrapping a cooperative Task or an RxSwift.Observable type. Being fully cooperative per Swift’s structured concurrency documentation I could find means that the child task‘s cancellation cannot outlive the parent task.

That‘s exactly the point. The base, the sync Sequence, is not cancellable, and AsyncSyncSequence is just a transformational wrapper sequence that bridges the sync and async worlds. A transformational async sequence should rely its cancellation based on the base sequence. Thus it‘s also non cancellable.

As Franz and I mentioned somewhere upthread, it‘s up to the consumer or the chain constructor to ensure that a non-cancellable sequence is broken up during cancellation in the most appropriate way possible, if at all.

let sequence1 = infiniteSyncSequence
  .stopOnCancellation() // introduce a special modifier sequence for chains
  .map { … }
  .filter { … }

let sequence2 = infiniteSyncSequence.map { … }.filter { … }

for await element in sequence2 {
  If Task.isCancelled { break } // the consumer can do it here
}

Both options are almost the same, they just differ in points of time where such an infinite chain is broken or stopped from being consumed.

As discussed upthread each sequence type and the modifier would / should receive documentation for its exact behavior. Therefore your Indefinite would propagate the non-cancelability through AsyncSyncSequence.

If on the other hand we would make AsyncSyncSequence cancellable, as it was already implemented, then we might unexpectedly drop elements from the base sequence and the user would need to create an own type to workaround this behavior.

With a hypothetical generator function it could look somewhat like this:

func `async`<T: Sequence>(_ sequence: T) -> stream T.Element {
  for element in sequence {
    yield element
  }
}

Such a transforming generator straight ignores any cancellation as it has to replay the entire sequence. If cancellation is needed in such case, it must be introduced down the async sequence chain or on the consumer‘s end.

Sharing feedback from the experience since I moved all my stream types to the custom wrapper which made it fully cooperative. The logic I have been working on was designed to be cooperative in every corner. I discovered quite a few tiny bugs and mistakes in my algorithms. As soon as those were fixed the system worked really nicely and everything acts predictively sequentially and more importantly gracefully.

1 Like

This is where I kinda differ on opinions: I feel that any AsyncSequence that does not respond to cancellation in any form is breaking expectations. The cooperative term in "cooperative cancellation" means (for AsyncSequence): if the task iterating the async sequence is cancelled the sequence will eventually terminate, either with a nil from the iterator's next or by throwing an error.

I can accept that we perhaps shouldn't eagerly terminate - but ignoring cancel seems a step too far.

@DevAndArtist perhaps it might be a good contribution to submit some additional tests to validate using the marble diagrams in swift-async-algiorithms (it has symbols for cancellations and terminal expectations). That way we can understand the scope of what the eager -> lazy set of changes would be.

In this particular example it's not really ignoring the cancellation. The cancellation just falls out.

Take the simple AsyncSequence conforming Task above as an example. You can write, that it eventually terminates, but not during a cancellation signal. This can still be some design decision for that root sequence.

let base = Task {
  let actor: SomeActor = ...
  while await actor.condition {
    if Task.isCancelled { print("sorry can't terminate yet") }
    await Task.yield()
  }
  return "swift"
}

// parent task
let task = Task {
  let sequence = base.map(\.count) // transformation via `map`

  for await count in sequence { // consumer
    print("future word count", count)
  }
}
// later in time
task.cancel()
  • In this example base is the root producer which does not respond with a termination during a detected cancellation signal and this is totally legal and still cooperative.
  • map wraps the future value and transforms it, but it does not eagerly return nil itself on cancellation, instead it delegates that responsibility to its Base. In this case it will inherit the non-cancelability from base and act the same way.

What does AsyncSyncSequence type do?

Well to me, it's a construct which provides a transformation operator async, just like map or filter are one. Therefore this sequence must respect the cooperative yielding of values from its base upstream. In that special case the base upstream is a plain Sequence which has no notion of cancellation and which itself should never stop emitting the values for such an event. That responsibility moves to the chain construction or the end consumer. Therefore it's really trivial that async operator should not eagerly add cancelation to the chain.

If one need immediate break during cancellation somewhere in the chain, even if the chain could gracefully shut down itself in a short time, we could still provide a special async sequence type that would provide that eager capability.

// straw man operator name `eagerlyCancel()`
// will cancel immediately when it detects `Task.isCancelled` during `next()`
[1, 2, 3].async.eagerlyCancel().map { "\($0)" } 

// will emit "1", "2" and "3" and finish
[1, 2, 3].async.map { "\($0)" } 

To further amplify this. An AsyncSequence should react to a cancellation on the best possible way (that can be delegating it to the upstream(s)) and terminate as soon as feasibly possible, BUT it does not have to terminate eagerly and immediately. Therefore the following example sequence is 100% valid and can be projected to the point I'm trying to make regarding AsyncSyncSequence.

struct AsyncOneTwoThree: AsyncSequence {
  typealias Element = Int

  struct AsyncIterator: AsyncIteratorProtocol {
    var array = [1, 2, 3]

    mutating func next() async throws -> Element? {
      if !array.isEmpty {
        return array.removeFirst()
      }
      return nil
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator()
  }
}

I will try to take a look at that in the following days (hopefully) and see how I can help here. We should also double check async sequences from the Concurrency framework and what we should do if there were some types that shouldn't be that eager (e.g. Async[Throwing]Stream).

If that was not enough or convincing, here's the same sequence, but broken up in pieces:

func array_async_eagerlyCancel_map() {
  let array = ["swift"]

  var iterator = (array.makeIterator()) // pseudo `async` wrapping

  repeat {
    if Task.isCancelled {   //
      break // return `nil` // `eagerlyCancel()`
    }                       //

    guard let element = iterator.next() else { //
      break // return `nil`                    // pull from base / upstream
    }                                          //

    let count = element.count // `map` operator

    print(count) // simulated consumer

    break
  } while true
}

The cancellation flows from the consumer some upstream, that responds to the cancellation. If no upstream responds to it, then there shouldn't be any eagerly performed. In this example we explicitly set an middle operator to perform exactly that operation right before the map though.

For me, there's a number of reasons I believe that AsyncSyncSequence should not attempt to eagerly cancel itself:

  • An asynchronous sequence that eagerly cancels itself is impossible to decompose into one that does not. However, it is trivial to add cancellation to a non-cancelling sequence. Either at the point of consumption or via an operator. This is similar to the arguments for ensuring that asynchronous sequences are back pressure supporting by default. An asynchronous sequence that absorbs back pressure is likewise impossible to decompose into one that does.
  • Non-async Sequences are naturally less resilient to being torn. Many Sequences adapted to be asynchronous won't be indeterminate like the example above. Many will have been designed to be non-reduceable units of data for which unexpected truncation could lead to all sorts of unforeseen issues.
  • Most non-async Sequence's finish immediately With the exception of indeterminate sequences which require special handling regardless of whether they are sync or async, a non-async sequence delivers all its elements and then finishes. Indeterminate sequences are the special case here, and like the handling of their sync counterparts, it shouldn't be unexpected that they require special care in their handling.

Finally, wrt to this breaking expectations:

While there's some areas of the existing Rx model that I think we're missing, this is one area where I strongly feel that Swift's asynchronous sequences can (and should) break away from the Rx model. I know there was discussion that asynchronous sequences shouldn't take on what other reactive programming frameworks do without considering if there was a better way, I definitely feel this is one of those situations.

There is already some effort being expended on educating people that it is their responsibility fulfil cooperative cancellation with Tasks (see the docs for Task), extending that to AsyncSequence is, in my mind, the natural extension of that. Id argue, it's only those with prior Rx experience that would require readjustment of expectations.

1 Like

To draw one last parallel between regular async functions and AsyncSequence is this simple example:

func noop() async { return } // async, cooperative, no cancellation

struct Noop: AsyncSequence {
  typealias Element = Never

  init() {}

  struct AsyncIterator: AsyncIteratorProtocol {
    func next() async -> Element? {
      nil
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    AsyncIterator()
  }
}

// hypothetical generator function
func noop() -> stream Never { return }

Even though it's a root AsyncSequence type, it does not have any cancellation requirements. It just finishes right away. Via cooperative cancellation is the termination through nil or a thrown error is a possibility for an AsyncSequence (via a retuned value or an error for a task) but by no means a forceful / eager requirement.

1 Like

I think we are going in circles here. There are some points we all agree on:

  1. Algorithms should forward cancellation to the upstream instead of handling it on their own
  2. We should improve documentation around cancellation behaviours of the various roots and in general some documentation about expectations on AsyncSequences
  3. We should revisit the AsyncStream implementation. At least to align the throwing and non-throwing one.

Now to the point where we are still split:

Is cancellation graceful or forceful? From how the language is working and how it is implemented in most APIs cancellation is forceful. It is not a signal of "please stop whenever it makes sense" but rather "please stop as soon as possible". That means also any root AsyncSequence that might be infinite or doesn't know when the next element is going to be produced must check for cancellation. I agree with @Philippe_Hausler here that AsyncSyncSequence can contain infinite sequences and therefor must check for cancellation eagerly in each next() call.

IMO, as soon as you stop treating task cancellation as something graceful and expect that you cannot make any assumption of what arrives or doesn't when you get cancelled your program becomes way more robust. If you want to signal a task that it should gracefully stop its work then you should build a separate mechanism to propagate that signal. This could done by just passing in another AsyncSequence that you combine with your other one.

Unfortunately I cannot agree with this statement. This basically tries to enforce more opinionated rules to AsyncSequence types which diverge significantly from regular cooperative task cancellation on other async constructs. If that was forceful for tasks then they would eagerly kill child tasks and detach from their results instantly. That behavior isn't very useful nor flexible and I'm glad that we don't have this. The only difference from AsyncSequence to regular async functions is that sequences yield 0 to n values and finish / terminate, but all of them can be traced back to a single async next function. Therefore that particular next function should be capable to do exactly the same cooperative work on cancellation as a plain async function, nothing more, nothing less. In your description it would however mean that we would limit that behavior by enforcing more rules, just because this is not a construct that either returns or throws.

Cooperative cancellation really means: "please stop as soon as possible, but really do whatever".

IMO cancellation is the same for a Task as it is for an AsyncSequence at some point somewhere you take a continuation and that point needs to handle cancellation. As you said next() is just an async method after all.

The "do whatever" is not true. Cooperative is "please stop as possible" since you just got cancelled.

1 Like

From what I've seen, Task and most recommendations around structured concurrency recognise that it's not the job of the system to determine when "as soon as possible" is. That's left for the programmer. From the docs: "it’s the responsibility of the code running as part of the task to check for cancellation whenever stopping is appropriate".

The defacto plumbing for structured concurrency is AsyncSequence, and for me that puts them in the category of not having the available knowledge to determine when it actually 'is appropriate'. It's just plumbing.

It seems that responsibility should definitely fall under the remit of the programmer – as it does with the rest of structured concurrency. Having two different sets of rules around cancellation depending on the context will be quite confusing in my opinion.

2 Likes

That's the point, the cancelled async task decides how to cancel and it doesn't have to, if that's an appropriate reaction. That's why I said "but really do whatever". The task is signaled, marked as cancelled by the compiler, but the reaction is still handled by that task not the parent task, the parent task will wait, indefinitely if it has to.

I'm sorry but this contradicts my mind. I cannot reason about a program where during cancellation I'm not allowed to unwind properly and the execution just halts. There might be systems where this is expected and it's totally possible to build even if we had cooperative AsyncSequence by default, as we can introduce helper operators to enforce eager cancellation where it would be feasible, but I cannot see this as a general default rule, because there's no going back and it would require me to re-build multiple algorithms from scratch to avoid this trap.

The effect of cancellation within the cancelled task is fully cooperative and synchronous. That is, cancellation has no effect at all unless something checks for cancellation.
Source

If you have an indefinite Sequence or AsyncSequence and nothing checks for cancellation, this will have no effect! This is by design and should not be ignored or discarded.

Ignoring this would equal a missed opportunity to make things different than other reactive frameworks ever did or even had the technical ability to do.

Really what calling the iterators async next() method post cancellations means is: "hey, I flagged you as cancelled, are you done yet?"

The outcome to this request is only:

  • a returned value (new value or partial result)
  • nil (successful finish)
  • any thrown error (could be CancellationError)
  • or an indefinite suspension (ignoring the cancellation)
  • trap

This 100% equals the same options for the outcome for a suspended but cancelled task some parent task await to resume.

If a prior art helps, Kotlin Coroutines also choose to stick to cooperative cancellation by default, with a limited set of deliberate exceptions (flow builder and multicast flows).

This includes their AsyncSyncSequence equivalent not checking for cancellation, unless an opt-in is applied via the cancellable() operator, checking for cancellation on every value emitted by the upstream producer.

—

It is perhaps worth considering such standardisation: a producer by default should continue to yield as long as the consumer demands it, even if the producer task itself is cancelled. Eager cancellation should be opt-in. This extends to any future buffering and multicast primitives.

The most important rationale for this IMO is reasoning of lifetime. The producer and the consumer may or may not belong to the same task hierarchy, hence potentially having a decoupled lifetime. Adhering to “no eager cancellation by default” (i.e. unless opt-in) simplifies the mental model because it then inherently guarantees “exactly-once delivery unless consumer cancels” as the default, no matter what happens to the producer task — which is opaque to and out of control of the consumer. In my perception, this also matches the general expectation of streams-of-values API users.

This does not mean producers and especially APIs that vend AsyncSequence have no control to eagerly cancel. They can opt-in themselves as needed, and inform their API users through API documentation.

4 Likes

Are there any updates on this in the documentation or the AsyncStream implementations?

I am currently struggling with the same problem of cooperative cancellation with async streams.

I have another example that I would like to handle with AsyncStream preferably.

I want to use AsyncStream for the OTA update of a Bluetooth device.
The user can abort/cancel the OTA update at any point, in code it could look like this:

// OTA update stream of events
func startUpdate() {
    ...
    let updateTask = Task {
        for await event in startUpdateStream() {
            // handle event
        }
    }
    ...
}

func cancelUpdate() async {
    updateTask.cancel()
    // await update task as the cancellation takes time
    await updateTask.value
}

The thing is that during the cancellation the iOS app needs to send a BLE command to stop the OTA update on the BLE device, the device needs to process that and respond that the OTA update was canceled. Only then does the iOS app know that the device is in an appropriate state after the OTA update cancellation.

That's why I would like to await updateTask.value after the cancellation to be sure that the cancellation has been fully processed. The stream wouldn't be terminated with nil or throw right away as internally it would wait for the BLE device response that the OTA update has been canceled.

It would also better represent the intent to not allow to start of the OTA update again right after the cancellation (as the cancellation still might be in progress).

I'm not exactly sure if this would correctly model my requirement but it seems like another use case for a cooperative AsyncStream cancellation.

2 Likes