I'm open minded if the "default" should be different, but this should be clearly communicated and well documented. Every AsyncSequence type should state how it performs the cancellation, if at all or if it delegates it to the upstream. That is really a very powerful tool at our disposal.
That's essentially my goal with this thread. I've been working extensively with async sequences and (un-)structured concurrency constructs to build a very robust logic that handles a very critical part of our application and which is able to gracefully terminate with the help of cooperative task cancellation. As soon as I discovered some potential flaws in the implementing sequence types I started thinking why this isn't the default already and brought this up to community's attention.
All of this does not prevent being pull based and we're still keeping back pressure fully in tact.
Also let me try to demonstrate one of the original examples I had upthread. I'll try to keep it as simple as possible.
Imagine we have a (potentially infinite) hot source stream of values such as this: [a, b, b, b, c, a, b, x, a, b, b, c, ...]
There's a concrete order in this source sequence.
a is always an initial signal event
b is some data transmission that needs to be accumulated
c is a signal that there won't be any b elements and the transmission finished
x is only emitted if there was no b or c for too long, it's a custom timeout event, without terminating the source sequence in an error
last rule is that a starts only after a c or x, and b only after an a or another b
I wrap this sequence into an AsyncCooperativeStream which transforms it into signals in a reversed flatMap manner:
inner [...] construct are also consumable AsyncSequences
whenever the consuming side of that transformed sequence cancels, I want the transformation to gracefully build the last element it can build (only if it already started) and then terminate.
if for example the termination happened at the point of time of the second a event, the sequence will cooperatively terminate with: [(a, [b, b, b, c]), (a, [b, x])]
the result for the current AsyncStream would however end up with corrupt and incomplete transmission:
[(a, [b, b, b, c])]
subsequent (a, [b, x]) could still be build but it will hit the terminated buffer
I still have the feeling you are mixing up two things here. Task cancellation and termination of the AsyncSequence. Termination means the production stopped. This can be due to any reason and is not in control of the consumer. Cancellation is the consumer indicating that it is no longer interested in elements and signals to the iterator that it should stop producing elements as soon as possible.
From an implementation point of view in AsyncSequences the right thing to when it has to create a continuation, is to wrap that continuation with a cancellation handler, e.g.:
withTaskCancellationHandler {
withCheckedContinuation { continuation in
// Store the continuation somewhere
}
} onCancel {
// Clean up the continuation
}
You will find the above pattern in almost any root AsyncSequence and also in a few async algorithms which need to create continuations. The above pattern also means that if the consuming task is cancelled we will get into onCancel right away. There is no other way with structured Concurrency to implement cancellation. What you will also find in most AsyncSequences is that they implement a fast-path before creating a continuation. This fast-path is called when there are buffered elements to avoid creating continuations and setting up cancellation handler. (Fast path in NIOAsyncSequenceProducer)
Don't get me wrong, I have been using AsyncSequences extensively in production use-cases and I have never come across task cancellation being a problem. That's why my question to you is: Why do you still care about elements if your task is cancelled? Why is your task getting cancelled in the first place?
Even if we would go ahead with your proposal and say that root AsyncSequences can define their cancelablity then we will have serious problems implementing this:
Often the only way to avoid doing this is creating unstructured Tasks which should be avoided
Some algorithms can't inherit the cancelability since they have to create continuations of their own.
Edit: The second problem is a bigger problem since these algorithms need to create unstructured Tasks which is a big problem by itself.
The async sequence can be considered as a running process. That process is requested to shut down gracefully on cancellation. It makes no sense to just cut it immediately apart from the consumer. If you're into analogy: If a driver on the road sees a red light he will slowly start decelerating before the vehicle stops, otherwise he'll inherit the kinetic forces and fly out the window. In my case I want to handle cancellation as gracefully as possible and I'm relying heavily on cooperative task cancellation, even inside async tasks.
In several cases it's because some parent state changed and requires a graceful shutdown via collectLatest before starting the next one. Gracefully shutting down a task also means to perform clean up operations where needed.
Judging by examining the implementation of Async[Throwing]Stream it seems somewhat doable.
I don't think this is true. Cancellation is not graceful on the opposite cancellation is forceful. If you want graceful termination you should not use cancellation for that. The whole ecosystem is build on this assumption and tries to aggressively stop work once cancellation has been triggered.
We have a similar problem in the server ecosystem where we want to gracefully shutdown servers, but cancellation is not the way to achieve this.
Well I have to disagree with you, as it's the whole point of the main term here: "cooperative"
Cooperative cancellation is not forceful and it really shouldn't be so. Cancelling as fast as possible a great goal, but it's not always necessary and it can be done gracefully. There's no point rejecting that.
The way I've interpreted what @DevAndArtist is saying is this: If you forget AsyncSequence for a moment and look at Swift's structured concurrency, there is a concept of 'cooperative cancellation'.
What 'cooperative cancellation' means, is that, unless the underlying program 'takes part' in cancellation, by checking Task.isActive or using withTaskCancellationHandler the Task will continue running as if it wasn't cancelled at all.
This is by design. It avoids torn data, and is a very useful property.
Now back to AsyncSequence, while the AsyncSequence type could opt in to the same level of flexibility as structured concurrency in general (i.e. allowing the programmer to determine where, when and how the sequence is cancelled) it currently doesn't facilitate this as some of the transformation algorithms are perhaps over eager when it comes to enforcing cancellation.
The question for me is whether we could facilitate this level of control for the programmer with AsyncSequence types. Whereas with the current behaviour this was the only choice in the Rx world, maybe with 'cooperative cancellation' property of Swift's structured concurrency, we can offer a genuinely useful point of difference.
That is a great explanation. Just because other reactive frameworks halt on dispose or cancellation doesn't mean that Swift's AsyncSequence type should inherit that behavior as we have a really great tool to propagate cancellation via structured concurrency or do it even manually with unstructured tasks where not otherwise possible.
Let me clarify this here. All root AsyncSequences should be in line with how cancellation in straight line async code works as well. That is to say that if there are elements buffered they will go ahead and happily return elements even if the task is cancelled. However once you create a continuation you have to handle cancellation. Since at that point you have no idea when the next element is going to be produced.
In your example @DevAndArtist, if your task is cancelled and your stream still has elements buffered it should return those. However once there is no element buffered and the task is cancelled it must return nil
I only partly agree here. There's no such things as "should" on the consumption part. Even if the stream (cancelled, terminated or not) has unconsumed elements, the consumer can opt out earlier. If the stream is no longer needed, we can discard it via deallocation (eventually).
Yes and no. The decision whether the stream should terminate, at least in case of the Async[Throwing]Stream should be driven by the producer, not the middle man (the stream / buffer in this case). The Async[Throwing]Stream is a transformation wrapper type just like AsyncMapSequence is one, and the stream's base would be the producer of the element.
From my experience of maintaining Combine (and some other systems like NSOperation); truth be told cancellation is the source of all sorts of gnarly bugs for folks.
Making root interoperation with the language system for cancellation makes a lot of sense; now not all AsyncSequence adopting types need to use handler cancellation; it is perfectly reasonable to notice the cancel at the next available point. For example AsyncBytes (swift-async-algorithms/AsyncBufferedByteIterator.swift at main · apple/swift-async-algorithms · GitHub is effectively the same backing engine) will keep iterating the current buffer and once it reloads it will interrogate if it needs to respond to cancellation.
In the initial designs AsyncSequence had a cancel method (just like Combine does). The issue became that ends up forcing everything to have pseudo referential semantics. It ended up that looking at the performance and ergonomics of how that panned out for things like .map were rather unsavory.
That all being said: is it out of the question for a base AsyncSequence to have an auxiliary cancel? No; it is perfectly reasonable. Does it apply to all AsyncSequences? From the initial design, to the systems I've been working on lately the answer is a resounding no from what I've seen.
I think this is what's being said though, @DevAndArtist has a sequence that requires elements being 'batched' in certain groups. This is very much possible with straight line code. And a custom source sequence could be designed in this way. It would even satisfy the rules of 'best effort cancellation'.
The issue is that the intermediary algorithms then break that property. And maybe they don't always need to cancel so keenly.
Of course not all algorithms would be able to postpone cancellation indefinitely (debounce and the like), and I think that's what makes this complex, but it's certainly interesting to me, that if we can propagate Task cancellation to the source anyway, would it not be a useful thing to allow that source sequence to then determine exactly how that cancellation takes place.
In fact, it doesn't have to be even the very root sequence. The cancellation must only walk up the chain until it finds the first sequence type which can properly respond to the cancellation.
If we want to view this more visually it should be abstractly mentally modeled as following:
I haven’t used a lot async streams yet so I was confused this thread but after reading this explanation it’s pretty clear now. The current behaviour feels wrong and makes cooperative cancellation harder to explain.
Circling back to all points I was trying to make. Let's for a second assume Swift might get async generators in the future. Wouldn't those be cooperative by default and design?
func foo() async throws -> stream Value {
yield value1
yield value2
if someCondition {
throw someError
}
await doSomeLongWork()
if Task.isCancelled {
yield lastValue // do not throw, return a last value
return // finish the generator earlier
}
yield value3
}
If I'm not totally mistaken, this hypothetical example just shows that AsyncSequence types should obey being cooperative on cancellation by design as well.
If this quoted technique and some other existing once aren't sufficient enough to make AsyncSequence types cooperative, then it just shows that we should aim to filling those holes as soon as possible and not over eager force abrupt cancellation through discarding or disconnection from the production side.
Have been thinking about all of this as well a bit more.
The point I definitely agree with is that algorithms should not implement their own cancellation unless it makes sense for the algorithm, e.g. debounce needs to handle cancellation since it should return as soon as it notices the consumer is cancelled.
However, there are some algorithms currently that have to handle cancellation on their own due to the limitations of how AsyncSequences are shaped up, e.g. merge needs to create an unstructured task and consume the upstreams in their. This results in the cancellation not being handled by the upstreams but rather by merge itself. (If we get generators this is something that we should be able to fix)
Furthermore, I think it is very important that we agree on the fact that cooperative cancellation is forceful from the consumer point of view. That means if the consuming task of either an AsyncSequence or a hypothetical generator gets cancelled, then the sequence/generator should end production as soon as possible. In detail, it depends on the concrete sequence/generator but latest the first time they get to a suspension point they have to check for cancellation.
Applying this concept to your generator example
func foo() async throws -> stream Value {
yield value1
yield value2
if someCondition {
throw someError
}
// Assuming do some long running work creates a continuation
// to adhere to the always make forward progress rule of Swift Concurrency.
// Then it would also handle cancellation.
try await doSomeLongWork()
if Task.isCancelled {
yield lastValue // do not throw, return a last value
return // finish the generator earlier
}
yield value3
}
fun doSomeLongRunningWork() async throws {
// Task.sleep handles task cancellation and throws on cancel
try await Task.sleep(for: .seconds(60)
}
I think the key phrase here is 'as soon as possible'. I do think that this should be on the programmer, the end user of AsynSequence pipelines, to decide and implement that 'as soon as possible'.
As the documentation for Task states:
Tasks include a shared mechanism for indicating cancellation, but not a shared implementation for how to handle cancellation. Depending on the work you’re doing in the task, the correct way to stop that work varies. Likewise, it’s the responsibility of the code running as part of the task to check for cancellation whenever stopping is appropriate.
In that spirit, I feel that the plumbing provided for structured concurrency (via AsyncSeqeunce) should defer responsibility for cancellation to the programmer in their concrete use case wherever and whenever that's feasible.
It does place an additional burden of responsibility on the documentation to clearly state what behaviour any one sequence has in regards to cancellation, but it seems a worthwhile cost and consideration.
Many AsyncSequences behave correctly anyway (filter, map) so it's just something additional to consider when creating new algorithms, and some tweaks/documentation for existing ones.
I agree with this. We should leave it to the consumer of an AsyncSequence to handle cancellation as well, e.g. they can check Task.isCancelled on every element the sequence produced and decide to terminate:
for await element in await someSequence {
if Task.isCanclled { return }
}
However, there is one important point in time where the implementation of AsyncSequences MUST handle cancellation and that is when they take a continuation. The reason for this is that at that point in time the consumer can't handle it on their own since they are still waiting for the next element.
A concrete example with AsyncStream:
P = Producer
C = Consumer
P yields "a"
P yields "b"
C awaits `next` -> consumes "a"
C tasks gets cancelled
C awaits `next` -> consumes "b" // This still gets returned since it was buffered up
C awaits `next` -> consumes `nil` since nothing is buffered and the task was cancelled
I also agree that the AsyncSyncSequences cancellation behaviour should be re-discussed. Currently, it checks for cancellation on every next() call. Furthermore, I think we should also better document these behaviours in both a general rule set document for AsyncSequences and on the individual AsyncSequences themselves.
I can see that will be the case in many instances, but I'm not sure it will be the case in all instances. But I could wrong.
Here's an example I'm thinking of from the top of my head: say you take a continuation from the consuming Task in your sequence, and then you spawn another detached Task which is managing pulling elements from some upstream sequence, rather than terminate the sequence immediately when you get a cancellation signal via withTaskCancellationHandler, you could simply forward the cancellation responsibility by calling cancel on the detached Task you spawned. The hope being that the upstream would then terminate gracefully and finally emit a nil value (allowing you to finally resume the stashed continuation), and cleanly finish the async sequence mindfully of cooperative cancellation.
I'm happy to see that I was able to spark some re-consideration, or at least some thinking on that particular topic. Cooperative task cancellation is so powerful, and AsyncSequence's should really utilize this to the fullest. This gives them a superior advantage compared to other reactive frameworks, at least those that I ever worked with.
AsyncStream is slightly a strange fella. I would strongly suggest that Async[Throwing]Stream is a buffering wrapper around some producer logic and that the producer logic should be considered as a base sequence generator, just like it's done with map and filter sequences. Thus the cooperative cancellation is signaled to the producer and it will decide how to properly finish the stream 'as soon as possible'.
P = Producer
S = Stream / Buffer
C = Consumer
P yields "a"
[Change] S buffers "a"
P yields "b"
[Change] S buffers "b"
C awaits `next` -> consumes "a"
C tasks gets cancelled
[Change] C forwards cancellation signal to P through S
[Change] P receives cancellation signal and decides to finish S
C awaits `next` -> consumes "b" // This still gets returned since it was buffered up
[Change] P finishes
[Change] S finishes
C awaits `next` -> consumes `nil` since nothing is buffered and the task was cancelled, which caused P to finish S
That example is basically exactly how I was able to workaround the current Async[Throwing]Stream behavior. I had to detach the call to next from the current task. If the parent get's cancelled I forward the signal through the stream's continuation's onTerminate handler, which reaches such a Task that drives the stream. It gracefully shuts down and finishes the stream, which results into nil or an error arriving at the consumer. See the implementation of AsyncCooperativeStream above.
It does respect:
cooperative task cancellation (as soon as possible)