Not quite sure what you mean, but your mention of the the closure passed to AsyncThrowingStream
, made me think there might be something else useful to know here:
Quite soon after Async(Throwing)Stream
was released it became apparent that its initialiser was overly restrictive. Shortly after, a community sanctioned pattern emerged which essentially 'breaks out' the continuation from the initialiser.
You should avoid doing this with any other concurrency continuations i.e with(Checked|Unsafe)Continuation
, etc., but with Async(Throwing)Stream
it's a perfectly reasonable thing to do. The continuation passed in to the initialiser closure is actually quite flexible, conforms to Sendable
, and can be safely used outside of its closure.
In fact, there's now a pending pitch to make this pattern part of the stdlib.
You'd use it something like:
/// Stream throws an error if a sculpting operation cancelled the save.
func save() -> AsyncThrowingStream<VoxelDataChunk, Error> {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: VoxelDataChunk.self)
self.continuation = continuation
self.task = Task {
...
}
return stream
}
You can then finish(throwing:)
the continuation/cancel the Task
elsewhere, for example your sculpt
method, if that helps.
Here's the utility methods I'm using:
public extension AsyncStream {
static func makeStream(
of elementType: Element.Type = Element.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: Self, continuation: Continuation) {
var continuation: Continuation!
return (Self(bufferingPolicy: limit) { continuation = $0 }, continuation)
}
}
public extension AsyncThrowingStream {
static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: Self, continuation: Continuation) where Failure == Error {
var continuation: Continuation!
return (Self(bufferingPolicy: limit) { continuation = $0 }, continuation)
}
}
I should also add that the Async(Throwing)Stream
types do not apply back pressure, they simply fill the buffer as fast as they can, so if you wish to limit the source of production to the rate of consumption you'll need to use another type of AsyncSequence
. I'd recommend checking out the AsyncThrowingChannel
type in the Swift Async Algorithms project