In this thread I would like to discuss the cooperative task cancellation in context of AsyncSequence
conforming types. Spoiler ahead, I personally think that some of the exiting async sequences might have an unwanted behavior and are not cooperative, at least from my point of view. With this feedback I would like to spark a discussion regarding how we should evaluate proposed async sequence algorithms and that we should check the existing types which already landed with _Concurrency
module.
I prepared a few simple playground examples to showcase the topic that I would like to discuss.
The first test case is a trivial async function which prints 1, 2, 3 and the current task cancellation state after a few seconds of sleep. The function itself does decide not to do anything in case of cancellation and just eventually completes. In other words it makes full use of the cooperative cancellation of structured concurrency, but in this particular case it decides against early or immediate cancellation.
func test_case_1() async {
print(1, Task.isCancelled)
await sleep(1)
print(2, Task.isCancelled)
await sleep(1)
print(3, Task.isCancelled)
}
The related playground output looks as following:
-- starting new test case --
1 false
-- test task cancelled --
2 true
3 true
-- test case finished - test task returned --
This behavior is great as it gives the developer full control over how a cancellation should happen within a specific async task / function. The task can immediately exit, it can perform some extra remaining work first and still exit earlier or it can ignore the cancellation as its return value might be very important and shouldn't be discarded.
If you can imagine, test_case_1
could be a considered as a stream of values, where each value must be yielded by ignoring the cancellation signal, because the task want to finish gracefully!
This is exactly what my first intuition was about AsyncSequence
types and their cancellation behavior. However as I will demonstrate with test_case_2
, this isn't the case for Async[Throwing]Stream
types.
test_case_2
wraps the exact same events into an AsyncSequence
(an AsyncStream
to be precise) and consumes it within a for loop. The stream's continuation is signaled through an inner detached Task
and the same task is forwarded the outer cancellation via the continuation's onTermination
handler.
func test_case_2() async {
let stream = AsyncStream<(Int, Bool)> { continuation in
let task = Task.detached {
var element: (Int, Bool) = (1, Task.isCancelled)
print("element: \(element) yield result: \(continuation.yield(element))")
await sleep(1)
element = (2, Task.isCancelled)
print("element: \(element) yield result: \(continuation.yield(element))")
await sleep(1)
element = (3, Task.isCancelled)
print("element: \(element) yield result: \(continuation.yield(element))")
continuation.finish()
}
continuation.onTermination = { @Sendable _ in
if !task.isCancelled {
print("cancelling AsyncStream inner task")
task.cancel()
}
}
}
for await value in stream {
print("for loop value:", value)
}
print("exited for loop")
}
The result for this test case looks very different than before as Async[Throwing]Stream
types are not cooperative during cancellation. In fact, as soon as the parent task cancels the stream, the stream will terminate and close its buffer which then will only permit to drain all remaining elements in the buffer, but it will drop all further elements from the upstream (Task
in this case).
Here's the result output for test_case_2
:
-- test case finished - test task returned --
-- starting new test case --
🟢 | element: (1, false) yield result: enqueued(remaining: 9223372036854775807)
🟠 | for loop value: (1, false)
✅ | cancelling AsyncStream inner task
-- test task cancelled --
❌ | exited for loop
-- test case finished - test task returned --
🟢 | element: (2, true) yield result: terminated
🟢 | element: (3, true) yield result: terminated
finishing execution
The emoji markers have the following meaning:
- log from the upstream trying to yield a value into the
AsyncStream
- downstream (
AsyncStream
) consumption- cancellation forwarding from the downstream to the upstream
- point in time registering when the downstream stopped consuming upstream values
As you can see by this example Async[Throwing]Stream
types do ignore the cooperative cancellation from the upstream. In this example the upstream again decides not to cancel because it may need to perform important computation and send that over through the downstream. This behavior is very surprising as it simply cuts off the upstream from the downstream and does not provide any chance to react to the cancellation signal in the way the upstream chain would seem fit.
This behavior is caused by the buffer cancel()
method implementations which does immediately call into finish()
method, that will terminate the entire stream [1] [2].
If the stream was a moving train, it would immediately cut off and stop the "pulling" locomotive and the rest of the train would unconditionally and uncontrolled drive forward past the locomotive until it eventually decelerates. Shouldn't the whole train rather decelerate and stop on the stop signal (cancellation
)?
I haven't looked at all the existing AsyncSequence
types so there might be more that act like this and do not cancel in a cooperative manner.
Here are two other concrete examples:
AsyncMapSequence
This type is fully cooperative as its cancellation behavior is dependent on the upstream Base
async sequence.
public mutating func next() async rethrows -> Transformed? {
guard let element = try await baseIterator.next() else {
return nil
}
return await transform(element)
}
AsyncSyncSequence
This type seems very trivial, but why is it cancelling the emission of the synchronous base iterator? Who decided this behavior? The upstream in this case, has no notion of cancellation and thus it implies to me that it shouldn't be cancelable at all.
public mutating func next() async -> Base.Element? {
if !Task.isCancelled, let value = iterator?.next() {
return value
} else {
iterator = nil
return nil
}
}
Yes I understand that we might wish to exit the downstream consumption earlier, but this should happen explicitly in this particular case, which brings me to one recent and related question that I asked in this thread:
With all my respect to the few people who tried to answer that question, I felt like the answers to this question weren't 100% confident. In fact if we go back to the Async[Throwing]Stream
types again and look how they currently behave, we find out that an Async[Throwing]Stream
will drain the pending values even after cancellation, however the consuming part, like the above for
loop can still decide to exit faster than the stream can supply us with new values and therefore is the Task.isCancelled
check within the loop a totally reasonable thing to do.
That again would perfectly explain the proper strategy for the cancellation of AsyncSyncSequence
:
func foo() async {
let sequence = [1, 2, 3].async // AsyncSyncSequence
for await value in sequence {
if Task.isCancelled {
break
}
await sleep(2)
// consume `value`
}
}
All these examples are gathered together just to showcase that there seems to be no clear vision on how to decide for the right cancellation behavior on AsyncSequence
conforming types yet. This is a very critical topic to nail down correctly, but as I just showcased with Async[Throwing]Stream
types, the cancellation is everything else than cooperative which will lead to bugs in peoples code when they have to rely on that crucial pats of structured concurrency.
I understand that other reactive frameworks such as RxSwift
or Combine
have similar abilities to dispose or cancel the subscriptions, however those techniques differ as they are non-cooperative by design.
One last real world example which I was working on, where I extensively used AsyncStream
type to move into the async
world. I have two sequences which are merged and emit data in some concrete order. That data must be transformed into an immediate signal with some context, which contains an inner stream. For the simplification, just imagine a reverse flatMap
operation from a [a, b, b, a, b, b]
sequence into [(a, [b, b]), (a, [b, b])]
. If the consuming downstream cancels, I want that transformation complete the currently active construction of such value gracefully and finish the inner [b, b]
stream in the appropriate way possible, not just cut of the upstream and produce something like this:
// upstream
-a-b-b---a-b-b-----
// transformation
-(a, x)---(a, y)---
^ ^
-b-b-| ^
-b-b-|
// downstream consumption + cancellation
-a-b-|
^ cancelled, cut off, incomplete data transmitted
// expected cooperative cancellation value
[(a, [b, b])]
// actual result -> undefined behavior
[(a, [b])]
I apologize in advance for the abstractness of this example.
Here's the preparation code to run the test cases from above:
import PlaygroundSupport
import Combine
let page = PlaygroundPage.current
page.needsIndefiniteExecution = true
// non-cancellable sleep helper
func sleep(_ seconds: TimeInterval) async {
var subscription: AnyCancellable? = nil
await withCheckedContinuation { continuation in
subscription = Timer
.publish(every: seconds, on: .main, in: .default)
.autoconnect()
.first()
.sink { _ in
continuation.resume()
}
}
let _ = subscription // silence the read warning
}
Task {
for test_case in [test_case_1, test_case_2] {
print("-- starting new test case --")
// start the test case
let task = Task {
await test_case()
}
// wait some time then cancel the task
await sleep(1)
task.cancel()
print("-- test task cancelled --")
// await the task to return
await task.value
print("-- test case finished - test task returned --")
}
// wait some extra time for more logs to arrive
await sleep(10)
print("finishing execution")
page.finishExecution()
}