[Pitch] New APIs for Async[Throwing]Stream with backpressure support

I need more time to digest and think about this pitch, but here is some initial feedback.

  • I find it rather confusing when the naming scheme moves away from Continuation and yield names. Many other APIs use these names, and they are clearly understood. It's strange to start using write instead of yield just because we need more API space here. On the other hand we still stick with finish, that's an interesting but odd inconsistency here.

  • Does this new API address anything regarding the cooperative cancellation from the consumers end? I think we clearly identified the problem with eager cancellation behavior of some async sequences in this past thread.

The proposal describes when the stream will terminate, but it does not describe on how the values will be produced post termination signal.

This example seems to have a typo. I think it should be stream not source that is iterated over.

// Termination through task cancellation
let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backPressureStrategy: .watermark(low: 2, high: 4)
)

let task = Task {
    for await element in source {
                         ^~~~~~ here!
    }
}
task.cancel()

The APIs do not really provide a way to listen to cancellation signal. So if the above task is cancelled, I expect that information to bubble up through the stream up to the source's element producer. The producer that feeds the source, should decide if it still wants to yield more values or if it wants to eagerly shut down the stream, but the stream shouldn't shut down immediately on its own. That would go again against cooperative cancellation.

Here's some re-fresher on task cancellation from the recent WWDC session.

[...] Task cancellation is used to signal that the app no longer needs the result of a task and the task should stop and either return a partial result or throw an error. [...] Cancellation is cooperative, so child tasks aren't immediately stopped. It simply sets the "isCancelled" flag on that task. Actually acting on the cancellation is done in your code. Cancellation is a race. [...] Remember, cancellation does not stop a task from running, it only signals to the task that it has been cancelled and should stop running as soon a possible. It is up to your code to check for cancellation.

  • onTermination feels potentially in the wrong place or I misunderstand the design concept here.

If I wrap a Task with an AsyncStream, I want to propagate the cancellation coming from onTermination to my Task instance. With this APIs that use case becomes somewhat harder to write.

Task {
  let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backPressureStrategy: .watermark(low: 2, high: 4),
    onTermination: { /* I want to cancel the producing task from here */ }
  )
  // the stream is capture within the task itself 🤔
}

Here's my personal workaround to patch the eager cancellation from AsyncStream into a cooperative cancellation:

public struct AsyncCooperativeStream<Element> {
  let continuation: AsyncStream<Element>.Continuation
  let stream: AsyncStream<Element>

  public init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    _ build: (AsyncStream<Element>.Continuation) -> Void
  ) {
    var streamContinuation: AsyncStream<Element>.Continuation! = nil
    let stream = AsyncStream<Element>(elementType, bufferingPolicy: limit) { continuation in
      build(continuation)
      streamContinuation = continuation
    }
    self.continuation = streamContinuation
    self.stream = stream
  }
}

extension AsyncCooperativeStream: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    let continuation: AsyncStream<Element>.Continuation

    // NOTE: This is `@unchecked Sendable` because `AsyncStream._Context` just captures
    // `AsyncStream._Storage` which itself is `@unchecked Sendable`, so we're safe here.
    struct _Box: @unchecked Sendable {
      let iterator: AsyncStream<Element>.Iterator
    }
    let box: _Box

    public mutating func next() async -> Element? {
      let box = self.box
      return await withTaskCancellationHandler {
        await withCheckedContinuation { continuation in
          // Detach the `next` method from the current parent task.
          Task.detached {
            var mutableIterator = box.iterator
            continuation.resume(returning: await mutableIterator.next())
          }
        }
      } onCancel: { [continuation] in
        // Forward the cancellation manually to the termination handler, then remove it so that
        // during a subsequent `next` call we do not signal another cancellation.
        let handler = continuation.onTermination
        continuation.onTermination = nil
        handler?(.cancelled)
      }
    }
  }

  public func makeAsyncIterator() -> Iterator {
    Iterator(continuation: continuation, box: Iterator._Box(iterator: stream.makeAsyncIterator()))
  }
}

Usually the setup is something like this:

AsyncCooperativeStream { continuation in
  let task = Task {
    defer {
      continuation.finish()
    }

    ... // up to me to decide on how to check and respond to cancellation
  }

  continuation.onTermination = { @Sendable _ in
    if !task.isCancelled {
      task.cancel()
    }
  }
}
  • On the first glance the watermark part is somewhat confusing. This is especially true if we want this API to replace the old one. The developer should not care about that part when initially using an AsyncStream, therefore I think we should at least provide some kind of a general default value for this.