[Pitch] New APIs for Async[Throwing]Stream with backpressure support

Hi Swift Evolution,

Here's a proposal for adding new APIs to Async[Throwing]Stream to add support for bridging backpressured systems. Proposal PR can be found here .

Introduction

SE-0314 introduced new Async[Throwing]Stream types which act as root asynchronous
sequences. These two types allow bridging from synchronous callbacks such as
delegates to an asynchronous sequence. This proposal adds a new way of
constructing asynchronous streams with the goal to bridge backpressured systems
into an asynchronous sequence.

Motivation

After using the AsyncSequence protocol and the Async[Throwing]Stream types
extensively over the past years, we learned that there are a few important
behavioral details that any AsyncSequence implementation needs to support.
These behaviors are:

  1. Backpressure
  2. Multi/single consumer support
  3. Downstream consumer termination
  4. Upstream producer termination

In general, AsyncSequence implementations can be divided into two kinds: Root
asynchronous sequences that are the source of values such as
Async[Throwing]Stream and transformational asynchronous sequences such as
AsyncMapSequence. Most transformational asynchronous sequences implicitly
fulfill the above behaviors since they forward any demand to an underlying
asynchronous sequence that should implement the behaviors. On the other hand,
root asynchronous sequences need to make sure that all of the above behaviors
are correctly implemented. Let's look at the current behavior of
Async[Throwing]Stream to see if and how it achieves these behaviors.

Backpressure

Root asynchronous sequences need to relay the backpressure to the producing
system. Async[Throwing]Stream aims to support backpressure by providing a
configurable buffer and returning
Async[Throwing]Stream.Continuation.YieldResult which contains the current
buffer depth from the yield(:) method. However, only providing the current
buffer depth on yield(:) is not enough to bridge a backpressured system into
an asynchronous sequence since this can only be used as a "stop" signal but we
are missing a signal to indicate resuming the production. The only viable
backpressure strategy that can be implemented with the current API is a timed
backoff where we stop producing for some period of time and then speculatively
produce again. This is a very inefficient pattern that produces high latencies
and inefficient use of resources.

Multi/single consumer support

The AsyncSequence protocol itself makes no assumptions about whether the
implementation supports multiple consumers or not. This allows the creation of
unicast and multicast asynchronous sequences. The difference between a unicast
and multicast asynchronous sequence is if they allow multiple iterators to be
created. AsyncStream does support the creation of multiple iterators and it
does handle multiple consumers correctly. On the other hand
AsyncThrowingStream also supports multiple iterators but does fatalError
when more than one iterator has to suspend. The original proposal states:

As with any sequence, iterating over an AsyncStream multiple times, or
creating multiple iterators and iterating over them separately, may produce an
unexpected series of values.

While that statement leaves room for any behavior we learned that a clear distinction
of behavior for root asynchronous sequences is benficial; especially, when it comes to
to how transformation algorithms are applied on top.

Downstream consumer termination

Downstream consumer termination allows the producer to notify the consumer that
no more values are going to be produced. Async[Throwing]Stream does support
this by calling the finish() or finish(throwing:) methods of the
Async[Throwing]Stream.Continuation. However, Async[Throwing]Stream does not
handle the case that the Continuation may be deinited before one of the
finish methods is called. This currently leads to async streams that never
terminate. The behavior could be changed but it could result in semantically
breaking code.

Upstream producer termination

Upstream producer termination is the inverse of downstream consumer termination
where the producer is notified once the consumption has terminated. Currently,
Async[Throwing]Stream does expose the onTermination property on the
Continuation. The onTermination closure is invoked once the consumer has
terminated. The consumer can terminate in four separate cases:

  1. The asynchronous sequence was deinited and no iterator was created
  2. The iterator was deinited and the asynchronous sequence is unicast
  3. The consuming task is canceled
  4. The asynchronous sequence returned nil or threw

Async[Throwing]Stream currently invokes onTermination in all cases; however,
since Async[Throwing]Stream supports multiple consumers (as discussed in the
Multi/single consumer support section), a single consumer task being canceled
leads to the termination of all consumers. This is not expected from multicast
asynchronous sequences in general.

Proposed solution

The above motivation lays out the expected behaviors from a root asynchronous
sequence and compares them to the behaviors of Async[Throwing]Stream. These
are the behaviors where Async[Throwing]Stream diverges from the expectations.

  • Backpressure: Doesn't expose a "resumption" signal to the producer
  • Multi/single consumer:
    • Divergent implementation between throwing and non-throwing variant
    • Supports multiple consumers even though proposal positions it as a unicast
      asynchronous sequence
  • Consumer termination: Doesn't handle the Continuation being deinited
  • Producer termination: Happens on first consumer termination

This section proposes new APIs for Async[Throwing]Stream that implement all of
the above-mentioned behaviors.

Creating an AsyncStream with backpressure support

You can create an AsyncStream instance using the new makeStream(of: backPressureStrategy:) method. This method returns you the stream and the
source. The source can be used to write new values to the asynchronous stream.
The new API specifically provides a multi-producer/single-consumer pattern.

let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backPressureStrategy: .watermark(low: 2, high: 4)
)

The new proposed APIs offer three different ways to bridge a backpressured
system. The foundation is the multi-step synchronous interface. Below is an
example of how it can be used:

do {
    let writeResult = try source.write(contentsOf: sequence)
    
    switch writeResult {
    case .produceMore:
       // Trigger more production
    
    case .enqueueCallback(let callbackToken):
        source.enqueueCallback(callbackToken: callbackToken, onProduceMore: { result in
            switch result {
            case .success:
                // Trigger more production
            case .failure(let error):
                // Terminate the underlying producer
            }
        })
    }
} catch {
    // `write(contentsOf)` throws when the asynchronous stream already terminated
}

The above API offers the most control when bridging a synchronous producer to an
asynchronous sequence. First, you have to write values using the
write(contentsOf:) which returns a WriteResult. The result either indicates
that more values should be produced or that a callback should be enqueued by
calling the enqueueCallback(callbackToken: onProduceMore:) method. This callback
is invoked once the backpressure strategy decided that more values should be
produced. This API aims to offer the most flexibility with the greatest
performance. The callback only has to be allocated in the case where the
producer needs to be suspended.

Additionally, the above API is the building block for some higher-level and
easier-to-use APIs to write values to the asynchronous stream. Below is an
example of the two higher-level APIs.

// Writing new values and providing a callback when to produce more
try source.write(contentsOf: sequence, onProduceMore: { result in
    switch result {
    case .success:
        // Trigger more production
    case .failure(let error):
        // Terminate the underlying producer
    }
})

// This method suspends until more values should be produced
try await source.write(contentsOf: sequence)

With the above APIs, we should be able to effectively bridge any system into an
asynchronous stream regardless if the system is callback-based, blocking or
asynchronous.

Detailed design, API/ABI compatibility, future directions, alternatives

Please take a look at the evolution PR which includes the full proposal.

Looking forward to hear what everyone thinks about this!

Franz

24 Likes

(disclaimer: I was heavily involved in the problem statement and design of this, so I may be biased)

To me, this is a must have, therefore strong +1.

At this point in time AsyncStream is quite dangerous to use because it doesn't enforce backpressure (unless you configure it drop elements which is often not acceptable). Especially in server use cases, if Async(Throwing)Stream were used with data fed from the network, it usually means a Denial of Service security vulnerability. This is because it often allows the attacker to feed data faster than it's consumed which means it balloons into the server's memory (--> lack of back pressure). Therefore, until now I cautioned people touching Async(Throwing)Stream unless they could prove ahead of time that the data fed into the stream is of a small, finite size that is not attacker controlled. This proposal fixes this issue and therefore addresses my main concern.

It also addresses shortcomings in consumer-to-producer as well as producer-to-consumer cancellation (called "termination" in the proposal) support that are crucial for most AsyncSequences. That's also necessary because if cancellation doesn't work both ways it's pretty easy to have a bunch of Async(Throwing)Streams lying around that will never make progress.

Thanks so much for polishing, finishing and writing all this up @FranzBusch.

6 Likes

I love the proposal, I'm especially excited by the idea that consumers could tell the sequence to stop producing in advance. This is great.

However, I'm slightly confused, when reading the code example, it almost sounds like the normal way to stop producing is to try to call the write() methods and let them throw errors to indicate the end of the stream. If that's the case, wouldn't it be better to just add a WriteResult case telling us to just end the sequence, something like this?

        /// A type that indicates the result of writing elements to the source.
        @frozen
        public enum WriteResult: Sendable {
            /// A token that is returned when the asynchronous stream's backpressure strategy indicated that any producer should
            /// be suspended. Use this token to enqueue a callback by  calling the ``enqueueCallback(_:)`` method.
            public struct CallbackToken : Sendable {}
            /// Indicates the sequence has already ended and nothing should be produced or enqueued.
            case stopProduction
            /// Indicates that more elements should be produced and written to the source.
            case produceMore
            /// Indicates that a callback should be enqueued.
            ///
            /// The associated token should be passed to the ``enqueueCallback(_:)`` method.
            case enqueueCallback(CallbackToken)
        }

Similarly, the callback for the enqueueCallback method takes a Result<Void, Error> as parameter to indicate the end of the production, I can imagine a case where we stop consumption because the callee mishandled the sequence, so perhaps we need a failure case, but does it make sense to throw an error because the caller broke out of consuming the elements in the sequence?

If I'm completely off the mark please do tell!

Thanks for providing feedback!

We have to be explicit here how we want to stop producing. In the proposal, I called this:

  1. Downstream consumer termination
  2. Upstream producer termination

The 1. happens when a producer calls source.finish() to indicate that the producer has no more elements to write. An example might be an HTTP request body where the producer indicated an end which should lead to the asynchronous stream to terminate after the last value has been consumed. For the consumer this just means that after next() returned the last value that any subsequent call will return nil

For the 2. case we want to stop the production side when we know that there is not consumer left. We can recognise that no consumer is left in the following scenarios:

  1. The Async[Throwing]Stream was deinited and no iterator was created
  2. The iterator was deinited (since the new interface provides a unicast Async[Throwing]Stream)
  3. The consuming task is canceled

To your question why the WriteResult doesn't return this. That is because the upstream producer termination can happen concurrently with us producing new values, therefore, you can provide an onTermination closure to the new makeStream() method which will be called once we detected that we need to terminate the producer.
Additionally, the various write() methods will throw an error when the sequence is terminated.

Similarly, here we do complete the Result with an error when the sequence terminated. This allows us to cancel the underlying production system. Using Result here is definitely debatable but we need a type that can indicate produceMore and terminateProducing. I am open to introduce a new enum for this instead of using a Result and an Error.

For the async write methods however I think using the standard error propagation pattern via throws is correct.

2 Likes

But the various write() do throw an error if you call the method and there are no consumers, or is it only throwing an error if you explicitly called finish() on the stream? If it's the latter, I would recommend adding this to the example to know how we stop a stream without having to handle an error.

Would one of the errors thrown by write() be a TaskCancelled error thrown on the consumer side or would it be a TaskCancelled thrown on the source side? This is what I'm confused about.

Maybe a stupid question, but how does that pitch relate to AsyncChannel? It seems to be mostly about adding backpressure to AsyncStreams.

AsyncChannel can only provide backpressure between two Swift Tasks. If you are producing from a non-async context then it cannot be used. This is because AsyncChannel.send is async. This is not true of AsyncStream.Continuation or this proposal here, which can both be used from non-async contexts.

Additionally, AsyncChannel has an effective internal buffer size of 1: a call to send suspends until the value is dequeued from next. That's not always optimal.

3 Likes

I just updated the pitch to include more information around how termination works. @rdemarest If you could give the two termination sections that I added a look and let me know if that clarifies your questions. I haven't yet decided on what concrete error to throw from the various write methods when trying to write to a finished source.

@lukasa already answered this nicely and I added a section that outlines the differences to AsyncChannel and NIOAsyncSequenceProducer. The only difference to Cory's statement is that I classified AsyncChannel to have a buffer of 0 since it never really buffers the elements but rather suspends any producer until the value has been consumed.

1 Like

Looks great thank you very much!

I can't wait to get my hands on this!

2 Likes

I need more time to digest and think about this pitch, but here is some initial feedback.

  • I find it rather confusing when the naming scheme moves away from Continuation and yield names. Many other APIs use these names, and they are clearly understood. It's strange to start using write instead of yield just because we need more API space here. On the other hand we still stick with finish, that's an interesting but odd inconsistency here.

  • Does this new API address anything regarding the cooperative cancellation from the consumers end? I think we clearly identified the problem with eager cancellation behavior of some async sequences in this past thread.

The proposal describes when the stream will terminate, but it does not describe on how the values will be produced post termination signal.

This example seems to have a typo. I think it should be stream not source that is iterated over.

// Termination through task cancellation
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backPressureStrategy: .watermark(low: 2, high: 4)
)

let task = Task {
    for await element in source {
                         ^~~~~~ here!
    }
}
task.cancel()

The APIs do not really provide a way to listen to cancellation signal. So if the above task is cancelled, I expect that information to bubble up through the stream up to the source's element producer. The producer that feeds the source, should decide if it still wants to yield more values or if it wants to eagerly shut down the stream, but the stream shouldn't shut down immediately on its own. That would go again against cooperative cancellation.

Here's some re-fresher on task cancellation from the recent WWDC session.

[...] Task cancellation is used to signal that the app no longer needs the result of a task and the task should stop and either return a partial result or throw an error. [...] Cancellation is cooperative, so child tasks aren't immediately stopped. It simply sets the "isCancelled" flag on that task. Actually acting on the cancellation is done in your code. Cancellation is a race. [...] Remember, cancellation does not stop a task from running, it only signals to the task that it has been cancelled and should stop running as soon a possible. It is up to your code to check for cancellation.

  • onTermination feels potentially in the wrong place or I misunderstand the design concept here.

If I wrap a Task with an AsyncStream, I want to propagate the cancellation coming from onTermination to my Task instance. With this APIs that use case becomes somewhat harder to write.

Task {
  let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backPressureStrategy: .watermark(low: 2, high: 4),
    onTermination: { /* I want to cancel the producing task from here */ }
  )
  // the stream is capture within the task itself 🤔
}

Here's my personal workaround to patch the eager cancellation from AsyncStream into a cooperative cancellation:

public struct AsyncCooperativeStream<Element> {
  let continuation: AsyncStream<Element>.Continuation
  let stream: AsyncStream<Element>

  public init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncStream<Element>.Continuation) -> Void
  ) {
    var streamContinuation: AsyncStream<Element>.Continuation! = nil
    let stream = AsyncStream<Element>(elementType, bufferingPolicy: limit) { continuation in
      build(continuation)
      streamContinuation = continuation
    }
    self.continuation = streamContinuation
    self.stream = stream
  }
}

extension AsyncCooperativeStream: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    let continuation: AsyncStream<Element>.Continuation

    // NOTE: This is `@unchecked Sendable` because `AsyncStream._Context` just captures
    // `AsyncStream._Storage` which itself is `@unchecked Sendable`, so we're safe here.
    struct _Box: @unchecked Sendable {
      let iterator: AsyncStream<Element>.Iterator
    }
    let box: _Box

    public mutating func next() async -> Element? {
      let box = self.box
      return await withTaskCancellationHandler {
        await withCheckedContinuation { continuation in
          // Detach the `next` method from the current parent task.
          Task.detached {
            var mutableIterator = box.iterator
            continuation.resume(returning: await mutableIterator.next())
          }
        }
      } onCancel: { [continuation] in
        // Forward the cancellation manually to the termination handler, then remove it so that
        // during a subsequent `next` call we do not signal another cancellation.
        let handler = continuation.onTermination
        continuation.onTermination = nil
        handler?(.cancelled)
      }
    }
  }

  public func makeAsyncIterator() -> Iterator {
    Iterator(continuation: continuation, box: Iterator._Box(iterator: stream.makeAsyncIterator()))
  }
}

Usually the setup is something like this:

AsyncCooperativeStream { continuation in
  let task = Task {
    defer {
      continuation.finish()
    }

    ... // up to me to decide on how to check and respond to cancellation
  }

  continuation.onTermination = { @Sendable _ in
    if !task.isCancelled {
      task.cancel()
    }
  }
}
  • On the first glance the watermark part is somewhat confusing. This is especially true if we want this API to replace the old one. The developer should not care about that part when initially using an AsyncStream, therefore I think we should at least provide some kind of a general default value for this.

First off. Thanks for the great feedback!

I was expecting the name bike shedding here and I fully welcome it. I tried to outline the reason for moving to a different name in the Alternatives considered section. The main reason for moving away from Continuation is IMO that it is super confusing compared to the Checked/UnsafeContinuation APIs. The latter means that a task is suspended and you can use the them to resume it. The AsyncStream.Continuation is just an interface to deliver new values to the stream. There might be no continuation here at all.
Similarly, yield() is also very overloaded by having Task.yield and the yield keyword. The former gives up the current task for work to interleave and the latter is used in the _modify/_read accessors.

Overall, I think it is worth revisiting the naming here and at least discussing it and see if we can reduce the confusion when trying to teach developers the concepts of continuations and async sequences.

Glad that you brought it up! You linked the talk further down in your reply and I want to add another quote from it

Task cancellation is used to signal that the app no longer needs the result of a task and the task should stop and either return a partial result or throw an error.

The reason why I haven't exposed functionality which leaves the decision of finishing the async stream when consumer cancellation happens is that IMO this is not up to the producer. The task got cancelled and it should finish ASAP. The producer shouldn't be able to block cancellation by never finishing the stream.

The doc comments of the finish method outline the behavior:

After all buffered elements are consumed the next iteration point will return nil or throw an error.

I agree on this one. I tried to move it to the factory method but realised myself that we should make it a property on the source instead to avoid lazy initialisation.

This is exactly what this proposal tries to remedy. The current APIs let the user set a buffer strategy but that brings little to no value with how the current buffer depth is communicated to the user. I disagree that the developers should not care about the backpressure strategy when this is an essential property of asynchronous sequences and heavily effects performance.
Introducing an adaptive backpressure strategy and making it the default could make sense here. We could also add a none backpressure strategy which the users has to opt-in.

I remain with a strong disagreement on that behavior. A "cooperative" cancellation means it's up to the task / producer to decide how to react to the cancellation signal. As I mentioned many times in the other thread, the task might not even cancel its operations at all if that's the desired behavior the developer needs.

let task = Task {
  while true {
    ... // does not check for cancellation here, we want to break only
    ... // when everything has been fully computed / processed
  }
}
task.cancel() // will have no effect

This shouldn't be any different with AsyncSequences. This flexibility is what enables the use of Swift's cooperative task cancellation to implement graceful and deterministic shut down operations. If that wouldn't remain the same for AsyncSequences it will be a huge painful experience for some applications / use cases due to this inconsistency of behaviors.

In our code base, it's extremely important that the arrival of a new value will shut down the previously created task. However that task must fully complete its remaining and critical operations before we can spawn a new task. If that wouldn't be the case, we would spawn a new task and have some parts of our program in some weird incomplete states. That would be the consequence of the eager shut down from AsyncStream.

I'm not arguing that eager cancellation is bad, it's a valuable and needed tool, but it shouldn't be the default for probably the most AsyncSequences.

I still don't believe exposing this on AsyncStream gains us much. You are focusing on one API here where you want control to check if something should be cancelled or not; however, in fact there are many APIs that will just cancel when their calling task gets cancelled. An example of this in the standard library is Task.sleep(). If you would call Task.sleep() in your while loop example then you would have the same problem. There are many more such APIs present in packages or the Apple SDKs.

IMO by now the fact is that any async API might return early/throw an error on task cancellation and there is no way to avoid this. However, I do agree that users should have more control here especially when you want to model something like a guaranteed async shutdown. I could see us adding a method that acts as a cancellation barrier. So that code like this can safely work:

func withScopeAccessToFoo(_ closure: (Foo) async -> Void) async {
  let foo = Foo()
  await foo.start()
  await closure(foo)
  
  await withCancellationBarrier {
    await foo.shutdown()
  }
}

In general, I don't agree with the fact that cooperative cancellation is up to the producer but rather up to the consumer here. With the above cancellation barrier you should be able to model this from a consumer side.

If you or any other readers are into analogies. If you want to break a fast moving vehicle you have two options. You stop the vehicle over time, as we have to decelerate it first, or we stop it immediately. However the immediate halt probably wouldn't very health for the passengers are inertial damper are either impossible or extremely hard to create. It's the same in your code base, the eager cancellation might not be desired as it will potentially cause issues that could be avoided if the cancellation is unwinded by signaling the producer that we're no longer interested in any of new values. The producer will then decide if, how, when and what it should return.

I understand am aware that many APIs are implemented with an eager cancellation, but that's up to those APIs to decide. They were not forced to do so by design. They all had the choice on how they want to react to cancellation. If we kept AsyncSequences like it is right now, we'd have a strange situation where this choice is removed from us in several places. It quickly becomes a source of surprises which could lead to issues.

For most APIs there isn't a choice really. They would become way more convoluted if they expose to the user how to handle task cancellation.

In general, I think the problem you describe is very specific to how your program is modelled. You want to use task cancellation to indicate that something should gracefully shutdown. This is not what task cancellation is or how it has been implemented. I would recommend to take a look at how swift-service-lifecycle has introduced the concept of graceful shutdown. This is more close to the semantics that you want and is fully modelled on top of Structured Concurrency.

I remain in a disagreement here. You're saying that async chain of next() operations should not follow the cooperative cancellation and straight ignore it. That goes against everything that has gone into consideration for structured concurrency design in swift. All that becomes completely worthless if AsyncSequences maintainers decide that their next() methods should not overate on the same manner.

For me personally that means hat I cannot rely and use many of the async sequence algorithms as they will not work as expected in terms of async task cancellation. I would need to reinvent the wheel here, because the default is shifted for next() methods.

That is not what I am saying; especially, not when it comes to algorithms. From the proposal:

Most transformational asynchronous sequences implicitly
fulfill the above behaviors since they forward any demand to an underlying
asynchronous sequence that should implement the behaviors

This is super important and we are trying to uphold this behavior in all our async algorithm implementations. In the end, the root asynchronous sequences are implementing the cancellation by wrapping the withCheckedContinuation calls in a withTaskCancellationHandler.
What is important to me here is that cancellation is not something that the producer of the AsyncStream controls but rather the consumer.

I think we both outlined our current thinking on this quite extensively here. I would like to hear from other people about their experience with the current semantics.

Yes, I also want to make clear that I'm not trying to be disrespectful here. I'm just trying to provide some valuable feedback as heavy API user, while I get countered from the API designer(s). At least I hope my feedback is valuable. This results into a back an forth potentially without any good resolution on the horizon, which gets tiring really fast. Yes, opinionated things happen in life.

For me AsyncStream is an intermediate async sequence, where it provides an entrance into the structured async world by creating a bridge between different paradigms. I as a developer of the producer side would like to gain the superpowers from the async's cancellation and apply that to my logic however I see fit, not however AsyncStreams buffer behaves on a cancellation signal. As AsyncStream for me is an intermediate part of the chain, I would like to hand the responsibility for the cancellation behavior up in the chain and not have it on AsyncStream. Therefore my code requires the buffer not to get immediately terminated on a cancellation signal coming from the consumer, but rather from the way the upstream / producer reacts to it. If it's eager or graceful is up to the producing upstream.


An example sequence:

struct AtLeast4Numbers: AsyncSequence { ... }

let task = Task {
  for await number in atLeast4Numbers {
    print(number)
    // non cancellable sleep of N seconds
  }
}
task.cancel()

Even though that task is cancelled and the cancellation reached the next method from the AtLeast4Numbers.AsyncIterator. The sequence keeps its promise delivering at least 4 numbers even if it gets cancelled before that point of time. Since the consumer did not explicitly opted out for eager cancellation by manually checking for task cancellation, all values will be printed as expected. This is the true promise of cooperative task cancellation applied to AsyncSequences.

If that default would be flipped for AsyncSequences and or not documented, I wouldn't be able to trust AtLeast4Numbers.


TLDR; I think we could solve this problem by potentially moving onTermination to the source and also by adding onCancellation to the source as well. onCancellation should just fire the closure once when the iterator first detects the task cancellation, and that's it. It should not change any states. Let the implementation in the closure handle that. If such closure is not provided, then that particular AsyncStream instance should not react to cancellation signals at all.

let (stream, source) = AsyncStream.makeStream(of: Int.self, backPressureStrategy: ...)
// PRODUCER - Somewhere nearby.

let producerTask = Task {
  source.write(...)
  // check for cancellation and handle it however needed!
  // when completed causes `onTermination` to fire
}

source.onCancellation = {
  producerTask.cancel()
}
source.onTermination = { /* some final clean up */ }

// CONSUMER - Somewhere else!
let consumerTask = Task {
  for await element in source {
    ...
  }
}
task.cancel() // causes `source.onCancellation` to fire once!
2 Likes

Now one could ask - Why aren’t we using cancellation in the first place here? The problem is that cancellation is forceful and doesn’t allow users to make a decision if they want to cancel or not. However, graceful shutdown is very specific to business logic often. In our case, we were fine with just stopping to handle new requests on a stream. Other applications might want to send a response indicating to the client that the server is shutting down and waiting for an acknowledgment of that message.

  • "cancellation is forceful" - What about cancellation is forceful besides that a concurrent task is marked as cancelled?! The HOW to react to the cancellation is not enforced and therefore not a requirement as cancellation is a race.
  • "doesn’t allow users to make a decision if they want to cancel or not" - This is only partly true in my opinion. The user is notified that the task is no longer needed and we would like to cancel it and the user still has the chance to handle that. There's no recovery from the state change though, but the user is free to ignore the signal.
  • "Other applications might want to send a response indicating to the client that the server is shutting down and waiting for an acknowledgment of that message." - That's fine, but that's only a different type of a graceful shutdown for another use case.

That said, I think we also misunderstood each other on the term "graceful shut down". I believe that there are multiple kinds of this. One that is required (cancellation) and one that is optional (signal, with the ability to fully reject it). I think I understand your point more from the second option's perspective, but in all that previous discussion I was rather providing feedback on graceful required shut down via cancellation. I think both use cases are perfectly valid.

Therefore is required graceful shutdown via cancellation like "please return or error, I will wait until you (task) will resume", but optional graceful shutdown is more like "can you cancel / shut down, yes no?".

1 Like