I need more time to digest and think about this pitch, but here is some initial feedback.
-
I find it rather confusing when the naming scheme moves away from
Continuation
andyield
names. Many other APIs use these names, and they are clearly understood. It's strange to start usingwrite
instead ofyield
just because we need more API space here. On the other hand we still stick withfinish
, that's an interesting but odd inconsistency here. -
Does this new API address anything regarding the cooperative cancellation from the consumers end? I think we clearly identified the problem with eager cancellation behavior of some async sequences in this past thread.
The proposal describes when the stream will terminate, but it does not describe on how the values will be produced post termination signal.
This example seems to have a typo. I think it should be stream
not source
that is iterated over.
// Termination through task cancellation
let (stream, source) = AsyncStream.makeStream(
of: Int.self,
backPressureStrategy: .watermark(low: 2, high: 4)
)
let task = Task {
for await element in source {
^~~~~~ here!
}
}
task.cancel()
The APIs do not really provide a way to listen to cancellation signal. So if the above task
is cancelled, I expect that information to bubble up through the stream
up to the source
's element producer. The producer that feeds the source
, should decide if it still wants to yield more values or if it wants to eagerly shut down the stream, but the stream shouldn't shut down immediately on its own. That would go again against cooperative cancellation.
Here's some re-fresher on task cancellation from the recent WWDC session.
[...] Task cancellation is used to signal that the app no longer needs the result of a task and the task should stop and either return a partial result or throw an error. [...] Cancellation is cooperative, so child tasks aren't immediately stopped. It simply sets the "isCancelled" flag on that task. Actually acting on the cancellation is done in your code. Cancellation is a race. [...] Remember, cancellation does not stop a task from running, it only signals to the task that it has been cancelled and should stop running as soon a possible. It is up to your code to check for cancellation.
onTermination
feels potentially in the wrong place or I misunderstand the design concept here.
If I wrap a Task
with an AsyncStream
, I want to propagate the cancellation coming from onTermination
to my Task
instance. With this APIs that use case becomes somewhat harder to write.
Task {
let (stream, source) = AsyncStream.makeStream(
of: Int.self,
backPressureStrategy: .watermark(low: 2, high: 4),
onTermination: { /* I want to cancel the producing task from here */ }
)
// the stream is capture within the task itself 🤔
}
Here's my personal workaround to patch the eager cancellation from AsyncStream
into a cooperative cancellation:
public struct AsyncCooperativeStream<Element> {
let continuation: AsyncStream<Element>.Continuation
let stream: AsyncStream<Element>
public init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncStream<Element>.Continuation) -> Void
) {
var streamContinuation: AsyncStream<Element>.Continuation! = nil
let stream = AsyncStream<Element>(elementType, bufferingPolicy: limit) { continuation in
build(continuation)
streamContinuation = continuation
}
self.continuation = streamContinuation
self.stream = stream
}
}
extension AsyncCooperativeStream: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
let continuation: AsyncStream<Element>.Continuation
// NOTE: This is `@unchecked Sendable` because `AsyncStream._Context` just captures
// `AsyncStream._Storage` which itself is `@unchecked Sendable`, so we're safe here.
struct _Box: @unchecked Sendable {
let iterator: AsyncStream<Element>.Iterator
}
let box: _Box
public mutating func next() async -> Element? {
let box = self.box
return await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
// Detach the `next` method from the current parent task.
Task.detached {
var mutableIterator = box.iterator
continuation.resume(returning: await mutableIterator.next())
}
}
} onCancel: { [continuation] in
// Forward the cancellation manually to the termination handler, then remove it so that
// during a subsequent `next` call we do not signal another cancellation.
let handler = continuation.onTermination
continuation.onTermination = nil
handler?(.cancelled)
}
}
}
public func makeAsyncIterator() -> Iterator {
Iterator(continuation: continuation, box: Iterator._Box(iterator: stream.makeAsyncIterator()))
}
}
Usually the setup is something like this:
AsyncCooperativeStream { continuation in
let task = Task {
defer {
continuation.finish()
}
... // up to me to decide on how to check and respond to cancellation
}
continuation.onTermination = { @Sendable _ in
if !task.isCancelled {
task.cancel()
}
}
}
- On the first glance the
watermark
part is somewhat confusing. This is especially true if we want this API to replace the old one. The developer should not care about that part when initially using anAsyncStream
, therefore I think we should at least provide some kind of a general default value for this.