AsyncSequences and cooperative task cancellation

In this thread I would like to discuss the cooperative task cancellation in context of AsyncSequence conforming types. Spoiler ahead, I personally think that some of the exiting async sequences might have an unwanted behavior and are not cooperative, at least from my point of view. With this feedback I would like to spark a discussion regarding how we should evaluate proposed async sequence algorithms and that we should check the existing types which already landed with _Concurrency module.

I prepared a few simple playground examples to showcase the topic that I would like to discuss.

The first test case is a trivial async function which prints 1, 2, 3 and the current task cancellation state after a few seconds of sleep. The function itself does decide not to do anything in case of cancellation and just eventually completes. In other words it makes full use of the cooperative cancellation of structured concurrency, but in this particular case it decides against early or immediate cancellation.

func test_case_1() async {
  print(1, Task.isCancelled)

  await sleep(1)
  print(2, Task.isCancelled)

  await sleep(1)
  print(3, Task.isCancelled)
}

The related playground output looks as following:

-- starting new test case --
1 false
-- test task cancelled --
2 true
3 true
-- test case finished - test task returned --

This behavior is great as it gives the developer full control over how a cancellation should happen within a specific async task / function. The task can immediately exit, it can perform some extra remaining work first and still exit earlier or it can ignore the cancellation as its return value might be very important and shouldn't be discarded.

If you can imagine, test_case_1 could be a considered as a stream of values, where each value must be yielded by ignoring the cancellation signal, because the task want to finish gracefully!

This is exactly what my first intuition was about AsyncSequence types and their cancellation behavior. However as I will demonstrate with test_case_2, this isn't the case for Async[Throwing]Stream types.

test_case_2 wraps the exact same events into an AsyncSequence (an AsyncStream to be precise) and consumes it within a for loop. The stream's continuation is signaled through an inner detached Task and the same task is forwarded the outer cancellation via the continuation's onTermination handler.

func test_case_2() async {
  let stream = AsyncStream<(Int, Bool)> { continuation in
    let task = Task.detached {
      var element: (Int, Bool) = (1, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      await sleep(1)
      element = (2, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      await sleep(1)
      element = (3, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      continuation.finish()
    }
    continuation.onTermination = { @Sendable _ in
      if !task.isCancelled {
        print("cancelling AsyncStream inner task")
        task.cancel()
      }
    }
  }

  for await value in stream {
    print("for loop value:", value)
  }

  print("exited for loop")
}

The result for this test case looks very different than before as Async[Throwing]Stream types are not cooperative during cancellation. In fact, as soon as the parent task cancels the stream, the stream will terminate and close its buffer which then will only permit to drain all remaining elements in the buffer, but it will drop all further elements from the upstream (Task in this case).

Here's the result output for test_case_2:

-- test case finished - test task returned --
-- starting new test case --
🟢 | element: (1, false) yield result: enqueued(remaining: 9223372036854775807)
🟠 | for loop value: (1, false)
✅ | cancelling AsyncStream inner task
-- test task cancelled --
❌ | exited for loop
-- test case finished - test task returned --
🟢 | element: (2, true) yield result: terminated
🟢 | element: (3, true) yield result: terminated
finishing execution

The emoji markers have the following meaning:

  • :green_circle: - log from the upstream trying to yield a value into the AsyncStream
  • :orange_circle: - downstream (AsyncStream) consumption
  • :white_check_mark: - cancellation forwarding from the downstream to the upstream
  • :x: - point in time registering when the downstream stopped consuming upstream values

As you can see by this example Async[Throwing]Stream types do ignore the cooperative cancellation from the upstream. In this example the upstream again decides not to cancel because it may need to perform important computation and send that over through the downstream. This behavior is very surprising as it simply cuts off the upstream from the downstream and does not provide any chance to react to the cancellation signal in the way the upstream chain would seem fit.

This behavior is caused by the buffer cancel() method implementations which does immediately call into finish() method, that will terminate the entire stream [1] [2].

If the stream was a moving train, it would immediately cut off and stop the "pulling" locomotive and the rest of the train would unconditionally and uncontrolled drive forward past the locomotive until it eventually decelerates. Shouldn't the whole train rather decelerate and stop on the stop signal (cancellation)?

I haven't looked at all the existing AsyncSequence types so there might be more that act like this and do not cancel in a cooperative manner.

Here are two other concrete examples:

AsyncMapSequence

This type is fully cooperative as its cancellation behavior is dependent on the upstream Base async sequence.

public mutating func next() async rethrows -> Transformed? {
  guard let element = try await baseIterator.next() else {
    return nil
  }
  return await transform(element)
}

AsyncSyncSequence

This type seems very trivial, but why is it cancelling the emission of the synchronous base iterator? Who decided this behavior? The upstream in this case, has no notion of cancellation and thus it implies to me that it shouldn't be cancelable at all.

public mutating func next() async -> Base.Element? {
  if !Task.isCancelled, let value = iterator?.next() {
    return value
  } else {
    iterator = nil
    return nil
  }
}

Yes I understand that we might wish to exit the downstream consumption earlier, but this should happen explicitly in this particular case, which brings me to one recent and related question that I asked in this thread:

With all my respect to the few people who tried to answer that question, I felt like the answers to this question weren't 100% confident. In fact if we go back to the Async[Throwing]Stream types again and look how they currently behave, we find out that an Async[Throwing]Stream will drain the pending values even after cancellation, however the consuming part, like the above for loop can still decide to exit faster than the stream can supply us with new values and therefore is the Task.isCancelled check within the loop a totally reasonable thing to do.
That again would perfectly explain the proper strategy for the cancellation of AsyncSyncSequence:

func foo() async {
  let sequence = [1, 2, 3].async // AsyncSyncSequence
  for await value in sequence {
    if Task.isCancelled {
      break
    }

    await sleep(2)
    // consume `value`
  }
}

All these examples are gathered together just to showcase that there seems to be no clear vision on how to decide for the right cancellation behavior on AsyncSequence conforming types yet. This is a very critical topic to nail down correctly, but as I just showcased with Async[Throwing]Stream types, the cancellation is everything else than cooperative which will lead to bugs in peoples code when they have to rely on that crucial pats of structured concurrency.

I understand that other reactive frameworks such as RxSwift or Combine have similar abilities to dispose or cancel the subscriptions, however those techniques differ as they are non-cooperative by design.


One last real world example which I was working on, where I extensively used AsyncStream type to move into the async world. I have two sequences which are merged and emit data in some concrete order. That data must be transformed into an immediate signal with some context, which contains an inner stream. For the simplification, just imagine a reverse flatMap operation from a [a, b, b, a, b, b] sequence into [(a, [b, b]), (a, [b, b])]. If the consuming downstream cancels, I want that transformation complete the currently active construction of such value gracefully and finish the inner [b, b] stream in the appropriate way possible, not just cut of the upstream and produce something like this:

// upstream
-a-b-b---a-b-b-----

// transformation
-(a, x)---(a, y)---
     ^        ^
     -b-b-|   ^
              -b-b-|

// downstream consumption + cancellation
-a-b-|
     ^ cancelled, cut off, incomplete data transmitted

// expected cooperative cancellation value
[(a, [b, b])]

// actual result -> undefined behavior 
[(a, [b])]

I apologize in advance for the abstractness of this example.



Here's the preparation code to run the test cases from above:

import PlaygroundSupport
import Combine

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

// non-cancellable sleep helper
func sleep(_ seconds: TimeInterval) async {
  var subscription: AnyCancellable? = nil
  await withCheckedContinuation { continuation in
    subscription = Timer
      .publish(every: seconds, on: .main, in: .default)
      .autoconnect()
      .first()
      .sink { _ in
        continuation.resume()
      }
  }
  let _ = subscription // silence the read warning
}

Task {
  for test_case in [test_case_1, test_case_2] {
    print("-- starting new test case --")

    // start the test case
    let task = Task {
      await test_case()
    }

    // wait some time then cancel the task
    await sleep(1)
    task.cancel()
    print("-- test task cancelled --")

    // await the task to return
    await task.value

    print("-- test case finished - test task returned --")
  }

  // wait some extra time for more logs to arrive
  await sleep(10)
  print("finishing execution")
  page.finishExecution()
}
5 Likes

In the meantime I came up with a custom workaround when building and using Async[Throwing]Stream types. I detach the call to next and forward the cancellation to the termination handler manually.

public struct AsyncCooperativeStream<Element> {
  let continuation: AsyncStream<Element>.Continuation
  let stream: AsyncStream<Element>

  public init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncStream<Element>.Continuation) -> Void
  ) {
    var streamContinuation: AsyncStream<Element>.Continuation! = nil
    let stream = AsyncStream<Element>(elementType, bufferingPolicy: limit) { continuation in
      build(continuation)
      streamContinuation = continuation
    }
    self.continuation = streamContinuation
    self.stream = stream
  }
}

extension AsyncCooperativeStream: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    let continuation: AsyncStream<Element>.Continuation
    let iterator: AsyncStream<Element>.Iterator

    public mutating func next() async -> Element? {
      await withTaskCancellationHandler {
        await withCheckedContinuation { continuation in
          // Detach the `next` method from the current parent task.
          Task.detached { [iterator] in
            var mutableIterator = iterator
            continuation.resume(returning: await mutableIterator.next())
          }
        }
      } onCancel: { [continuation] in
        // Forward the cancellation manually to the termination handler, then remove it so that
        // during a subsequent `next` call we do not signal another cancellation.
        let handler = continuation.onTermination
        continuation.onTermination = nil
        handler?(.cancelled)
      }
    }
  }

  public func makeAsyncIterator() -> Iterator {
    Iterator(continuation: continuation, iterator: stream.makeAsyncIterator())
  }
}

extension AsyncCooperativeStream: Sendable where Element: Sendable {}

Here's a 3rd test case which behaves cooperatively as originally thought.

func test_case_3() async {
  let stream = AsyncCooperativeStream<(Int, Bool)> { continuation in
    let task = Task.detached {
      var element: (Int, Bool) = (1, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      await sleep(1)
      element = (2, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      if Task.isCancelled {
        print("exiting test_case_3 early")
        continuation.finish()
        return
      }

      await sleep(1)
      element = (3, Task.isCancelled)
      print("element: \(element) yield result: \(continuation.yield(element))")

      continuation.finish()
    }
    continuation.onTermination = { @Sendable _ in
      print("-- trying to cancel async stream --")
      if !task.isCancelled {
        print("cancelling AsyncStream inner task")
        task.cancel()
      }
    }
  }

  for await value in stream {
    print("for loop value:", value)
  }

  print("exited for loop")
}

And the related output:

-- starting new test case --
🟢 | element: (1, false) yield result: enqueued(remaining: 9223372036854775807)
🟠 | for loop value: (1, false)
-- trying to cancel async stream --
✅ | cancelling AsyncStream inner task
🟢 | element: (2, true) yield result: enqueued(remaining: 9223372036854775807)
☑️ | exiting test_case_3 early
-- test task cancelled --
🟠 | for loop value: (2, true)
❌ | exited for loop
-- test case finished - test task returned --


Generally speaking I think we should build our mental model around the fact that the upstream should decide on how to cancel the chain. Ideally every downstream would signal the upstream that the consumer wants to cancel it. On the connection or consumption side of things we can only decide explicitly that we're no longer interested in the values. A finite async sequence should always complete or throw from the upstream process, it should never be implicitly torn apart from the upstream.

All that said, the root sequence should always have one there allowing three properties:

  • non-throwing or throwing
  • finite or infinite
  • cancellable or not
  • (theoretically also: hot or cold)

That would make AsyncSyncSequence in my opinion into a not cancellable, non-throwing but finite async sequence.

To break out our this sequence we either do that on the consumption end or we should introduce a cancellable sequence to do that earlier.

AsyncCancellable[Throwing]Sequence

extension AsyncSequence /* where Failure == Never */ {
  func cancellable() -> AsyncCancellableSequence<Self>
}

// somewhere else
let array = [1, 2, 3, 4]
// non-throwing, cancellable and finite async sequence of strings
let strings = array.async.cancellable().map { "\($0)" } 


The custom AsyncThrowingCooperativeStream wrapper allows me to trace a potential bug in RxSwift implementation of the values wrapper.

If I understand you correctly here then your assumption is that cooperative cancellation w.r.t. AsyncSequences is that they should only terminate their production when they don’t have any elements left (speaking about root AsyncSequences here)

However, cooperative cancellation in Swift means that if the task gets cancelled everything that the task is doing should stop as soon (and cooperatively) as possible. This means that any AsyncSequence, once it notices that the task is cancelled, must terminate its production. If AsyncSequences wouldn’t do this, then task cancellation could take forever.

Your are stating that upstream should decide how to cancel, however that doesn’t fit with the model of AsyncSequences being pull based. In that model the consumer is deciding when to stop requesting the next element and the upstream must respect it.

I also have to disagree with the statement that AsyncSequence should state if they are cancellable or not. IMO all AsyncSequences should behave the same here. If the task is cancelled then they should terminate the production. This allows you to reason about the behavior of them.

Right now all AsyncSequences that I know of are implemented the same way. They wrap their withCheckedContinuation call in a cancellation handler. Once they get informed about cancellation they terminate the production.

I haven’t come across a case where this is limitin, so I am interested in your underlying problem that you are trying to solve.

2 Likes

I don't feel this is the right description of what I meant. In short a cancellation should signal to an async sequence to also cancel the production, but the async sequence should also handle this cooperatively by design. The async sequence should not close the production right away if that's not cooperatively by its logic.

Async[Throwing]Stream are both not cooperative, as on cancellation it only closes the buffer from accepting further elements and ignores any asynchronous work the producer (aka. the stream continuation driver) wants to perform to unwind itself and possibly but not necessarily halt the sequence. Async[Throwing]Stream only allow the buffer to be filled while nothing is cancelled or the producer didn't finish. However as soon as cancellation arrives before the completion, the producer is cut off from the buffer and values simply gets lost and discarded without having a chance to cooperatively react to this signal. To be honest this almost smells like a bug as all it would require to fix this behavior is to remove the call to finish() from inside the cancel() method inside the Async[Throwing]Stream._Storage classes.

Yes this is what I'm aiming for.

Correct and this could be an expected behavior. If we had a multi-cast AsyncSequence type to mimic Passthrough/CurrentValueSubject publishers from combine, you would not expect that the main sequence itself would ever terminate other than on deallocation. It's the responsibility of the async sequence chain or the consumer to break out from the consumption loop.

Here I also almost agree. I had to put this quote below your next sentence as my previous answer is somewhat related. An async sequence must not terminate by design. It can terminate, and it would be great if it did, but that's up to the user to establish this proper behavior whenever needed. I can write an async method which will never terminate on a cancellation and this can be considered a design decision. An AsyncSequence should not differ in this behavior.

The cooperative cancelling is either handled by the stream if it's reasonable to do so, or the responsibility should be moved to the upstream (see AsyncMapSequence again).

This contradicts. If there's a cancelation signal raised, the sequence should hopefully halt asap., because that would be cooperative. Just preventing the emission from the chained upstream by guarding using for example Task.isCancelled is not cooperative (again see AsyncMapSequence, it's the base sequence that will handle this, not AsyncMapSequence itself).
Pulled base behavior is great and it's completely respected when everything is still handled cooperatively. There is zero change that I propose to this. In fact in my code base I'm using an iterator and pulling every next element one by one via back pressure and upstream buffering, which is an amazing ability to have.

I must disagree, see my previous response in regarding to an async function that must not terminate on cancellation by design.

func foo() async {
  while true {
    await sleep(...)
    pipe.signal(SomethingUseful())
    if privateCondition { // not `Task.isCancelled`
      break
    }
  }
}

We do not disallow these types of functions, so why should AsyncSequence differ in this regard? This significantly limits their use and design space.

For example AsyncSyncSequence should really not be cancellable, it should only re-emit the captured base sync iterator elements. If you need cancellation, you're free to do so by explicitly transforming the async sequence chain into a cancelable chain or you can break out from the pull based consumption manually while (a) stop consuming further values and (b) discarding those remaining values from the upstream.

That is not true and I have to make use of the AsyncMapSequence again to demonstrate my point. The sequence itself does not verify cancellation and does not halt the emission from the upstream during cancellation. This responsibility is moved cooperatively to the base sequence. If the base sequence would ignore the cancellation, so will AsyncMapSequence not terminate the production.

I became aware of this issue by intensively studying the behavior of Async[Throwing]Stream types. If for example you'd wrap some delegate type into a stream and then at some point cancel it, the delegate might still emit some last events and potentially a halting signal. Remember that we used the stream as a wrapper to emit those values to the consumer of it. However during cancellation those last events and the halting signal will potentially land into the void and we have no chance to properly and cooperatively finish the stream via the manually driven continuation.



Sorry but I feel like I sound like a broken record at this point.

TLDR; All existing and future AsyncSequence types should be carefully (re-)evaluated on how they should act on cooperative task cancellation and they should also considered as cancellable or not. IMHO it's a wrong to assume that all async sequences must be cancellable, that is simply not true, nor should every async sequence type decide on its own if it should drastically stop the emission of elements during cancellation, that should be forwarded to the upstream, just like tasks forward the cancellation though the task tree.

Hi @DevAndArtist, are you saying that your preferred behaviour is if a source sequence receives an element (prior to cancellation) the sequence should attempt to deliver it before ceasing production of elements?

In other words, it's fine if cancellation is asynchronous, or indeed if it takes some indeterminate amount of time, but if elements are pulled into an AsyncSequence (or pushed into its buffer in the case of Async[Throwing]Stream) an attempt to deliver those elements to the consumer must be made (whether or not the underlying Task has been cancelled)?

I hope my English isn't leaving me, but I tend to agree here. It's up to the consumer to decide whether he/she wants to consume the next value while there are some pending values left.

func consumer() async {
  for await value in asyncStream {
    if Task.isCancelled { // We know that `consumer` was cancelled from the
      break               // parent task, but the `asyncStream` still emitted
    }                     // a new value, in this specific case we don't need
    ...                   // it.
  }
}

The above behavior is 100% cooperative. Async[Throwing]Stream will still behave like this even today, when the buffer has some unconsumed pending elements post cancellation. Or in other words, it won't nil out immediately unlike for example AsyncSyncSequence in its current implementation.

If I wanted the immediate nil from the implicit iterator's next method, I could introduce a transformation AsyncSequence that would do exactly that.

for await value in asyncSequence.topOnCancellation() { ... }

Just to repeat myself from before. A cooperative (child-)task cancellation will not imply that the (task-)task will ever halt. By its cooperative behavior all we can do is hope that it will, but it's not forced to. Right now this behavior is not properly aligned with AsyncSequence types.

Here's a simple example that will not properly work with AsyncStream today.

let task = Task.detached {
  for i in (1 ... .max) {
    continuation.yield(i)
    await sleep(1) // non-cancellable sleep
    if Task.isCancelled {
      continuation.yield(i + 1) // emit two more values
      continuation.yield(i + 2)
      break // then break and finish
    }
  }
  continuation.finish()
}

yield(i + 1), yield(i + 2) and finish() will likely hit the wall as the buffer already terminated. However that unstructured task was designed be be cooperative on cancellation.


If we'd flip the sides on who is faster to emit and consume values, we still run into interesting and expected behavior.

// wrapped by AsyncStream
let task = Task.detached {
  for i in (1 ... 10_000) {
    continuation.yield(i)
  }
  continuation.finish()
}

// the consumer side
func streamConsumer() async {
  for await element in tenKStream {
    await sleep(1)
    print(element)
  }
}

If we'd cancel streamConsumer while it didn't consumed all elements, it will still print every single second a new value, as the buffer will still be filled with pending but not consumed values. The for loop won't break immediately during cancellation, thus streamConsumer remains cooperative.

If the user would want to break out faster from the loop it has the be made explicitly:

func streamConsumer() async {
  for await element in tenKStream {
    if Task.isCancelled {
      break
    }
    await sleep(1)
    print(element)
  }
}

There's a good can of worms with all this and all I'm trying to do with this thread is to raise your folks awareness. :slightly_smiling_face:

1 Like

In Combine there's the values property which transforms a publisher into an AsyncSequence. RxSwift mimicked this property, but their implementation was based on a non-cooperative AsyncThrowingStream.

At the point of writing this their implementation looks like the following code:


var values: AsyncThrowingStream<Element, Error> {
  AsyncThrowingStream<Element, Error> { continuation in
    let disposable = asObservable().subscribe(
      onNext: { value in continuation.yield(value) },
      onError: { error in continuation.finish(throwing: error) },
      onCompleted: { continuation.finish() },
      onDisposed: { continuation.onTermination?(.cancelled) }
    )
    
    continuation.onTermination = { @Sendable _ in
      disposable.dispose()
    }
  }
}

Since the stream is non fully cooperative, there's no issue during cancellation as the Observable won't even have to chance to signal the continuation anymore. However onDisposed: { continuation.onTermination?(.cancelled) } would be a bug if the stream was fully cooperative during cancelation. To fix this issue would need to be onDisposed: { continuation.finish(throwing: CancellationError() }.

This gives Observable the chance during cancellation transform its dispose signal into a proper CancellationError and push that through the AsyncThrowingStream.

I disagree with this. To me this sounds like you have the concepts the wrong way round. The difference between Combine/RX and AsyncSequences is that we are in a pull based pattern here. That means if a unicast AsyncIterator sees that the consuming task is canceled. Then it should do two things:

  1. It should return nil or throw a CancellationError from the next() call. This is in line with cooperative cancellation since the task indicated it is cancelled, we must stop production.
  2. Terminate its upstream production. Since it is a unicast sequence it knows that after its single consumer cancelled it can now stop the production.

This is correct and this a big difference between a unicast and a multicast AsyncSequence. If the AsyncIterator of a multicast sequence gets notified that its consumer got cancelled. The only thing that it should do is return nil or throw a CancellationError. It cannot affect the actual production of values since there might be other consumers.

I think a point we should clarify here is what type of AsyncSequence is Async[Throwing]Stream and I have to admit it is a bit in a weird place. Async[Throwing]Stream is an anycast sequence. That means it allows multiple consumers and the first queued up consumer receives the next value. However, at the same time Async[Throwing]Stream also cancels the whole thing when a consumer is cancelled. This make its behaviour weird since you would expect an anycast AsyncSequence to allow consumers to come and go. IMO we should clarify its intended behaviour and change it accordingly. The only problem here is that while it isn't API breaking it might be quite semantically breaking. So maybe we should put this change in with Swift 6.

I agree with one part of this. Cancellation should in most parts be handled by the root AsyncSequence and most algorithms don't have to think about it (with the exception of algorithms that currently need to create an unstructured Task to consume multiple upstreams like merge. These need to handle cancellation on their own).
However, I disagree with the larger statement. Cancellation should be the same for every AsyncSequence. This is the only way you can reason about this, especially in places where you get opaque AsyncSequences from APIs. It also makes reasoning in your own code way more straight forward. If you know your task is cancelled then you can expect the next next call to terminate. If your task is cancelled and you still wanted to consume the rest of the elements then I think you have to ask yourself why did your task get cancelled in the first place.

That's what I initially thought, and indeed, it does appear to behave that way on the brief experimentation I did with the type, but this line in the next method for its storage suggests otherwise. Unless I'm missing something elsewhere.

I will properly respond to your message a bit later this day. Right now I want just to point out the following trivial scenario.

If you can reasonably have an async function which does not cancel and for example returns a sequence of elements, so should able to build a non cancellable AsyncSequence, or otherwise you'd significantly limit the design and use space of async sequence types. If you can't reason about the termination of async functions that gets the task cancellation forwarded, you shouldn't avoid this for AsyncSequence types either. Both should behave exactly the same in regards to cooperative cancellation. Only that gives us the power to build whatever behavior we need, anything else is an artificial restriction.

func nonCancellable() async -> [Int] {
  await sleep(10) // non cancellable
  return Array(1 ... 10_000)
}

// non cancellable cooperative stream
AsyncCooperativeStream { continuation in 
  Task {
    await sleep(10) // non cancellable
    Array(1 ... 10_000).forEach { continuation.yield($0) }
    continuation.finish()
  }
}

To properly reason about the behavior of cooperative task cancellation an async construct must document its exact behavior. That's the only thing you can do about it.

Yeah to make matters worse AsyncStream and AsyncThrowingStream diverge in the implementation. The latter fatalErrors where as the former implements anycast. We have to fix that up as well

3 Likes

This sounds reasonable to me, and is supported by @FranzBusch's thoughts.

It would probably require the creation of a AsyncCooperativeStream to support the use case, but then I think it's reasonable to expect the 'transformation algorithms' (map, etc.) don't attempt to cancel unnecessarily.

I don't fully get this example. The usage side would be completely different for the two.

let values = await nonCancellable()

for value in values {}

vs

let stream = AsyncStream()

for await value in values {}

The major difference here is where the await keyword is placed. This is not only the point of potential suspensions but also the point where task cancellations happen.

I am with you on this. We should document it, but IMO the documentation should be "AsyncSequenceIterators should return nil or throw a CancellationError as soon as they notice that their consuming task is cancelled. They MUST NOT return any more elements after the consuming task got cancelled."

I had slightly a different impression, but it might be just me missing the exact point Franz was making. I'll come back to that later.

That's one of my points. Upthread the AsyncCooperativeStream fixed my issues and I discovered a good load of other cancellation bugs in our code base after migrating to it.

Things like map delegate the cancellation to its Base sequence, which makes the sequence that implements map actually fully cooperative. Great! That's why I keep repeating, that an AsyncSequence doesn't have to be cancellable by design, it's up the every specific conforming type, wether it's a transformation sequence type or the root sequence type.

My point with this example was that both sequences will not cancel when iterated over. Yes the await keyword and the suspensions points differ in placement, but the non-termination of the sequences holds.

Here, I completely disagree, this behavior is not cooperative. A sequence does not have to go with either of those as there are still things like infinite sequences and a cooperative ignoring of the cancellation signal.

Comparing this to a regular async function, which on cancellation can:

  • return / ignore the cancellation
  • return a partial value
  • throw CancellationError
  • throw a totally different error
  • never return at all (e.g. keep an active loop alive and keep processing)
  • trap

AsyncSequence by your suggestions is trying to limit that design space and eliminate several of these options.

Cooperative task cancellation should traverse from the consumer side up through the async sequence chain to find that sequence type which decides how to terminate further emission of the values. This basically just extend the cancellation tree from tasks into async sequences. It's a natural extension.

1 Like

Thinking about this a bit more, and perhaps I'm not understanding correctly, but would it not be difficult and brittle to isolate the responsibility of cancellation to a single point in the code?

It feels that the way it currently works is that if a Task is cancelled, any part of the async stack can go ahead and terminate its work – if it gets the opportunity.

And it kind of has to be this way as we don't really know which parts of the async stack will be running or suspended. If you try and isolate it to one place, that part may be suspended while there's some other long-running operation taking place.

I'm not saying you couldn't design an AsyncSequence pipeline this way if you needed to, but I'm not sure I would agree that it should be the default.

If I understood you correctly. If all async sequences were cooperative then you will by default eventually run into a sequence that initiates the termination of emission in whatever way it needs to, and if it still need to perform some work it will likely suspend. It's all natural like with regular task cancellation.

Always immediately returning nil or CancellationError from a sequence when it detects cancellation makes the whole concept of being cooperative on cancellation worthless on async sequences.

Yeah, I see your point. It makes sense to give people control on this, and maybe it's not the job of transformation sequences to make assumptions about how cancellation will be handled.

I definitely think this should be optional though, as in, if people want this behaviour in their source types, they should likely roll their own, or perhaps put out to pitch in Swift Async Algorithms for the AsyncCooperativeStream you describe above. (For example, I think the current behaviour of AsyncStream is probably a good default.)

In terms of the transformation sequences, It does seem to make sense that they leave the job of performing the cancellation to their upstream, or the method they wrap (i.e. the transformation closure in map). As you say, it should eventually run into the critical element producing source sequence which will be best equipped to perform a graceful termination.