Back-pressure support in particular will be very helpful. It's currently hard to use
AsyncStream for many types of real-world tasks because it can easily run off into pathological cases (e.g. enumerating a folder hierarchy to a processing task that's slow[er], resulting in unbounded memory growth within the buffer). Worse I fear some people using it currently don't understand these risks, and are producing poorly-behaved applications as a result.
So big +1 on improvements in this area.
AsyncChannel is rarely useful in practice because it doesn't buffer values - it's essentially just an elaborate way to move elements one at a time between two tasks (or to divide work between tasks, for which there's simpler existing alternatives like
AsyncChannel will now be increasingly similar to
AsyncStream, with this change. Confusingly similar, I fear.
AsyncChannel seems to still have some superior attributes even given this proposal, such as explicit multi-user support and well-defined consumer order.
I don't know which one makes the best starting point, but is there a reason that one of these two cannot be made a functional superset of the other, allowing the other to be deprecated?
Can you elaborate on why the callback token API is necessary? The native
try await source.write(contentsOf: sequence) is clearly the best option, and I see the utility of
try source.write(contentsOf:onProduceMore:) for interfacing with non-Swift-Concurrency code, but the callback token bit seems odd.
It also seems to open up an error case that the other two don't: what if the producer receives the
enqueueCallback result but fails to enqueue the callback?
Buffering vs back-pressure
Can you spell out why a new
bufferingPolicy: version of
makeStream(…) is necessary, as opposed to just adding a back-pressure option to
BufferingPolicy? Is it just because the proposed design creates a kind of parallel implementation based around
I expect it'll be a source of confusion to users of
AsyncStream that there are these two options, with very different APIs on the producer side.
Especially given that the proposal itself points out that the existing API isn't ideal and that this is basically implementing a new one, should it perhaps just lean fully into that and propose this new API as a replacement for the old? And therefore needing to support the existing buffer policies as well?
contentsOf: methods' documentation says:
If there is a task consuming the stream and awaiting the next element then the task will get resumed with the first element of the provided sequence.
I find this oddly confusing, even though technically it's accurate. The first couple of times I read it, it sounded like
AsyncChannel's behaviour, with no buffering inside
AsyncStream itself. I don't think it's necessary to spell out the intuitive and expected behaviour re. consumers reading items from
AsyncStreams - unless there's some subtle implication which this isn't spelling out, like the consumer executing in the producer's task or somesuch (which as far as I know is impossible, but it could still be confusing to those not completely familiar with Swift Concurrency).
I think it's more important to remind readers that
write(…) will eagerly consume elements from the given sequence into the stream's internal buffer, up to the high-water mark, only blocking [the producer] if it can't immediately fit all elements from the sequence. To help ensure producer authors don't see only one of those two behaviours in their tests and forget about the other possibility.
If the asynchronous stream already terminated then this method will throw an error indicating the failure.
What if the stream terminates while
write(…) is running, e.g. because the consumer lets the stream deinit?
AsyncStream API reports the buffer capacity remaining - this new API doesn't seem to have any equivalent functionality? It can be useful to have that available to producers so that they can e.g. log the value, for debugging.
A more independent API for this could be even more useful, such as a way to register a closure that's called whenever the buffer usage changes. This is important for cases like charting that want to see the exact time of each buffer usage change.
Could the low & high be expressed through a
RangeExpression<Int>? That'd make it possible to have half-open settings, like:
.watermark(...10) for an implicit low water mark of 0.
.watermark(5...) for an unbounded buffer; a somewhat special but useful case of a stream that simply endeavours to pre-buffer at least 5 items and won't eagerly try to buffer
Int.max items but also will never block producers from writing.
It'd also allow a completely open setting, via
UnboundedRange_, which is arguably defeating the point (since it's equivalent to the existing
BufferPolicy) but it could be useful as an option for code that [only] sometimes wants that unbounded, non-eager-producing behaviour and wants to use a single version of the
AsyncStream producer API. Could be useful in tests, for example, to make the producer-consumer interactions more deterministic. (It's also possible with the API as currently proposed anyway, by passing 0 and Int.max as the watermarks.)
It'd make sense to change the enum value to
buffered in this case, too.
The proposal already mentions some attractive, additional back-pressure strategies. Might be worth also suggesting a customisable one, which takes some closure or similar to evaluate if/when to prompt producers for more items given the current buffer (which they can inspect to make their decision). That could also handle the specific case of implementing a bounded-memory policy which cannot be handled generically since the effective memory footprint of an arbitrary
Element type requires custom logic to determine.
In the future, we could deprecate the current continuation based APIs since the new proposed APIs are also capable of bridging non-backpressured producers by just discarding the
Is that strictly true; doesn't the caller also need to specify a
Int.max high water mark? Some of the write methods are async and [presumably] block if the stream's buffer is full.
Beyond that nit-pick, I do not like the idea of having two parallel APIs to
AsyncStream, as mentioned earlier. So it'd be good to have a clearer and stronger deprecation plan.