Why does AsyncThrowingStream silently finish, without error, if cancelled?

It appears that AsyncThrowingStream doesn't distinguish between the actual end of the sequence vs iteration being aborted due to task cancellation, because AsyncThrowingStream behaves exactly the same way for both situations - it just returns nil from next.

It also does so immediately, irrespective of what the producer driving the continuation is actually doing. It doesn't allow the onTermination handler to actually perform the cancellation. Which also can mean that the continuation is used - e.g. yield or finish(throwing:) - after the stream has already communicated [false] end-of-stream to the consumer - which seems like it must be illegal? It's certainly semantically invalid.

How can I 'intercept' the enclosing task's cancellation in order to handle it myself and perform the proper sequence, and ensure the consumer actually knows the stream was cancelled [by throwing an error up to them]?

The best workaround I've found so far is to separately communicate the intent to cancel the task to my consumer first, so that it knows when it sees a supposed end of iteration that it's a lie and actually it was cancelled. But this is quite a lot of extra work. :confused:

Returning nil on cancellation is a totally valid thing for AsyncSequences to do. In fact it is the only thing for AsyncThrowingStream to do since the streams Failure type might be a specific error so it can’t just throw a CancellationError.

Making the behavior on cancellation configurable is something that has come up more often and I think is something we can experiment with. The only thing that I am worried about is that somebody will just ignore the cancellation once we make it customizable which will block the whole consumer task from getting cancelled. Once the consumer task is cancelled it’s going to be hard to do any work with potentially returned values anyways since any call to an async throwing method after next returned a value might just throw a CancellationError.

I am curious what cancellation sequence you want to see in the consumer here? Do you need to clean up some resources? Could we potentially model this with a with style method somewhere?

2 Likes

Cooperative cancellation of AsyncStream is missing.
Related topic https://forums.swift.org/t/asyncsequences-and-cooperative-task-cancellation

I miss that possibility too. It feels inconsistent with the general cooperative cancellation rules.

In comparison, Kotlin's Flow cancellation is cooperative by design

AsyncStream is implementing cooperative cancellation. Cooperative cancellation in Swift means that once a task is cancelled the task should come to an exit as fast as possible. To implement this AsyncStream is setting up a task cancellation handler when it has to suspend the consumer.

Now what AsyncStream doesn't offer is allowing the producer to customise what to do when the consumer gets cancelled. It might be that a producer wants to send an extra element or throw a CancellationError. Those might be reasonable things to do for specific use-cases and can be achieved by implementing your own root AsyncSequence. Importantly though, AsyncStream does support cooperative cancellation.

One problem with customising the cancellation behaviour of a root AsyncSequence is the usefulness of it. As a hypothetical example:

let (stream, source) = MyCustomAsyncStream<Int, Never>.makeStream()
rootAsyncSequence.onConsumerCancellation = {
    // When the consumer gets cancelled we want to send two more elements and then finish
    source.send(1)
    source.send(2)
    source.finish()
}

let task = Task {
  for await element in stream {
    print(element)
    try await Task.sleep(for: .seconds(1))
  }
}

// Cancelling the task after 1.5 seconds
try await Task.sleep(for: .seconds(1.5))
task.cancel()

In the example above, MyCustomAsyncStream provides a customisation point for what happens if the consumer gets cancelled. Here we are sending a few more elements and finish the source. However, in the task where we consume we are sleeping for a second after each element. As a result once our consumer task gets cancelled we won't see the last elements since the Task.sleep will throw and we will exit the loop.

The point I am trying to make is that customisable consumer cancellation isn't often as helpful since once a task is cancelled it is very hard to make more progress in that task without hitting another method that checks for cancellation.

Kotlin's cooperative cancellation is very similar to Swift's. In Kotlin suspending coroutine functions are checking for cancellation once they have to suspend. In Swift, the continuations don't do this automatically but you can setup task cancellation handler and mimic that behaviour. In fact, it is recommend to always setup a cancellation handler around every withContinuation to support cooperative cancellation.

4 Likes

I know, and I even sometimes implement my own iterators this way when it's convenient to me. But this is a core library function, and it feels a bit heavy-handed for the library to be forcing this upon me. :confused:

Right - this came up during the review of typed throws as an example of why sum types are needed, so that it could be declared throws(Failure | CancellationError).

I don't think this is AsyncThrowingStream's problem, if it happens - that's coder-error. It's no different to any other situation where cancellation isn't correctly handled (or isn't handled at all).

In a nutshell, I'm downloading files. As a stream of byte buffers. It's important to distinguish between "this is actually the whole file" and "this is not the whole file", because it may even be possible to parse the data in either case and not get any apparent errors. e.g. NSImage is super forgiving about corrupt / truncated files - it'll just render the missing part of the image as opaque grey - which is great, I suppose, if you're trying to model classical web browser behaviour, but it's really super frustrating when you actually need the whole, correct image for your program to function. :slightly_smiling_face:

Strictly-speaking I just need it to throw on cancellation - it doesn't particularly matter what it throws. But it would be nice if it lets me throw, because then I can let URLSession produce the error and it fits neatly into my existing error handling (since there's plenty of other cases I have to handle anyway, and they're all wrapped up in Cocoa error types - it'd be nice to not have heterogeneity in how I extract the actual error reason).

Incidentally, I've also realise that in this specific case another workaround is to not actually cancel the Task, but instead to expose the underlying URLSession task and cancel that instead. Arguably that's better than my prior workaround, but it's still a pretty gross - and unreliable - abstraction violation (now instead of returning just the AsyncThrowingStream from my download function, I'll have to return a tuple with the implementation detail URLSession task in it too, and cross my fingers the caller never screws up / forgets about this contract and cancels the Task instead).

Is that the overriding goal, though?

Cancelling promptly is of course always a goal, because it's nice to be responsive and to not waste resources. But, clean cancellation is surely more important, right? Having library code just abruptly - and silently - kill itself in the middle of your data flow graph is surprising and hampers clean cancellation, and clean-up.

In any case, the crux of the problem isn't really about how much 'initiative' AsyncThrowingStream shows re. cancellation, it's that it just silently ends the iteration, rather than actually signalling the cancellation. I wouldn't be that bothered, as noted earlier, if AsyncThrowingStream did just immediately throw an error. That it's not even letting me throw an error, that's a problem.

2 Likes

That is simply not correct. This is a goal, not a general rule. Cooperative means "the task should try to cooperate", it is not enforced to end immediately / eagerly. A task might not even check for cancelation at all, or it is already past that check. Even if it does check for the cancellation, it gets the chance granted to handle this signal in the most appropriate manner. This is the definition of being cooperative here. Async[Throwing]Stream is just broken as it goes against this principles no matter how hard you're trying to defend it.


I don't want to rehash the past thread. To satisfy all use cases Async[Throwing]Stream should provide a (back ported) option to opt out of the eager default cancellation into the true cooperative cancellation.

1 Like

I cannot guarantee how stable this hack is, but the trick I used to workaround this pain point was to detach the stream from the main async chain. :/

public struct AsyncThrowingCooperativeStream<Element, Failure> where Failure: Error {
  let continuation: AsyncThrowingStream<Element, Failure>.Continuation
  let stream: AsyncThrowingStream<Element, Failure>

  public init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit:
      AsyncThrowingStream<Element, Failure>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncThrowingStream<Element, Failure>.Continuation) -> Void
  ) where Failure == any Error {
    var streamContinuation: AsyncThrowingStream<Element, Failure>.Continuation! = nil
    let stream = AsyncThrowingStream<Element, Failure>(
      elementType,
      bufferingPolicy: limit
    ) { continuation in
      build(continuation)
      streamContinuation = continuation
    }
    self.continuation = streamContinuation
    self.stream = stream
  }
}

extension AsyncThrowingCooperativeStream: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    let continuation: AsyncThrowingStream<Element, Failure>.Continuation

    // NOTE: This is `@unchecked Sendable` because `AsyncThrowingStream._Context` just captures
    // `AsyncThrowingStream._Storage` which itself is `@unchecked Sendable`..
    struct _Box: @unchecked Sendable {
      let iterator: AsyncThrowingStream<Element, Failure>.Iterator
    }
    let box: _Box

    public mutating func next() async throws -> Element? {
      let box = self.box
      return try await withTaskCancellationHandler {
        try await withCheckedThrowingContinuation { continuation in
          // Detach the `next` method from the current parent task.
          Task.detached {
            var mutableIterator = box.iterator
            do {
              let element = try await mutableIterator.next()
              continuation.resume(returning: element)
            } catch {
              continuation.resume(throwing: error)
            }
          }
        }
      } onCancel: { [continuation] in
        // Forward the cancellation manually to the termination handler, then remove it so that
        // during a subsequent `next` call we do not signal another cancellation.
        let handler = continuation.onTermination
//        continuation.onTermination = nil
        handler?(.cancelled)
      }
    }
  }

  public func makeAsyncIterator() -> Iterator {
    Iterator(continuation: continuation, box: Iterator._Box(iterator: stream.makeAsyncIterator()))
  }
}

Haven't checked it against Swift 6 nor do I have unit tests for this.

as a practical matter, Franz is right. the reality on the ground today is that you cannot realistically expect anything async to succeed after the current task has been cancelled. therefore, you should not use cooperative cancellation with anything that requires nontrivial (async) cleanup, the right approach is to model this explicitly at the application level.

in my opinion, cooperative cancellation has been somewhat oversold as a way for developers to “not have to think about cancellation”. it’s useful for very simple things, like aborting a deadline enforcement, but it’s no substitute for actually defining all of your exit paths.

1 Like

Allow me to disagree with the first bit and re-iterate that the main issue is not about the second part. I never stated that I want to proceed with any further structured async (sub-)tasks from the context of a cancelled task. All I want and need is the exact same ability to freely be able to handle the cancellation signal and unwind the async stream by properly shutting it down the way I see fit. I certainly do not want the stream to be forcefully killed under my fingers without me being able to react to the cancellation. By the end of the day, I'm the producer of the events and the async stream is an intermediate convenience type to enter the AsyncSequence world. However this particular intermediate type decided to be non-cooperative in terms of cancellation and be eager instead.

func foo() {
  while true {
    // never shut down on cancellation
  }
}

let task = Task {
  foo()
}

task.cancel() // it will not shutdown because `foo` decided so

An AsyncStream on the other hand will enforce an eager shut down, no matter how the producer drives it. This is wrong. It gives you zero chance to properly react, it cuts of the buffer and you cannot do anything about it.

let mappedStream = someStream.map { ... }

What if you were to cancel a chained stream. Why wouldn't async map not eagerly cancel and stop right there? Why does the cancellation still propagate further through the upstream? The answer is simple, map is implemented in a cooperative way, Async[Throwing]Stream is not.

1 Like

Another workaround (in some cases) is to call finish(throwing:) first, i.e. in the onTermination handler, before it returns. It's not perfect because it precludes passing up the URLSession cancellation error, but that's a burden of extra error handling code, not technically a functional problem.

I originally thought this wasn't viable because I thought it was illegal to call finish(throwing:) multiple times, but I find that in fact it's explicitly permitted and works as best as you can expect - second and subsequent calls are just completely ignored. Per the docs.

I did consider using a semaphore (or similar) to block onTermination from returning until the cancellation has been fully, properly handled, but I'm not sure how well AsyncThrowingStream will react to that blockage. It's not holding its lock while calling the termination handler, but it's not clear what the calling task / thread context can be.

1 Like

Yes, we agree that AsyncStream handles the cancellation in the sense of the Task Swift cancellation.
But it does not allow us to handle fully the cooperative cancelation in the sense that we discuss here.

Maybe for the most common use cases, current behavior is fine.
Still, I'm not at all convinced that there is no need to allow handling the cancellation in the producer side of the stream in case of cancellation to give the developer full control over the behavior.

Even by changing the example code by removing the Task.sleep, it makes perfect sense.

let (stream, source) = MyCustomAsyncStream<Int, Never>.makeStream()
rootAsyncSequence.onConsumerCancellation = {
    // When the consumer gets cancelled we want to send two more elements and then finish
    source.send(1)
    source.send(2)
    source.finish()
}

let task = Task {
  for await element in stream {
    print(element)
    // REMOVE Task.sleep
  }
}

// Cancelling the task after 1.5 seconds
try await Task.sleep(for: .seconds(1.5))
task.cancel()

There is no end of examples that could benefit from having full control over the cancellation (mine).

I would say the opposite.
Cooperative cancellation in my opinion was a great design choice by not treating API users as “not have to think about cancellation”. Rather have to think about the cancellation.

I think this topic was created, and others exactly because of "we think about cancellation" but AsyncStream does not allow to fully control that.

I would disagree. Why not use the Swift concurrency model for asynchronous work that requires action on Task cancellation?
Then I don't see much sense why cooperative cancellation would be introduced anyway.

1 Like

That is another topic for discussion. But I'm not sure if that was the best design choice to allow easily create continuation without having to think about cancellation. Also, how not obvious is and tricky in details is to create good continuation cancelation handling

1 Like