Back-pressure support in particular will be very helpful. It's currently hard to use AsyncStream
for many types of real-world tasks because it can easily run off into pathological cases (e.g. enumerating a folder hierarchy to a processing task that's slow[er], resulting in unbounded memory growth within the buffer). Worse I fear some people using it currently don't understand these risks, and are producing poorly-behaved applications as a result.
So big +1 on improvements in this area.
Versus AsyncChannel
To me, AsyncChannel
is rarely useful in practice because it doesn't buffer values - it's essentially just an elaborate way to move elements one at a time between two tasks (or to divide work between tasks, for which there's simpler existing alternatives like DispatchQueue.concurrentPerform(iterations:execute:)
).
However, AsyncChannel
will now be increasingly similar to AsyncStream
, with this change. Confusingly similar, I fear.
AsyncChannel
seems to still have some superior attributes even given this proposal, such as explicit multi-user support and well-defined consumer order.
I don't know which one makes the best starting point, but is there a reason that one of these two cannot be made a functional superset of the other, allowing the other to be deprecated?
Callback token
Can you elaborate on why the callback token API is necessary? The native try await source.write(contentsOf: sequence)
is clearly the best option, and I see the utility of try source.write(contentsOf:onProduceMore:)
for interfacing with non-Swift-Concurrency code, but the callback token bit seems odd.
It also seems to open up an error case that the other two don't: what if the producer receives the enqueueCallback
result but fails to enqueue the callback?
Buffering vs back-pressure
Can you spell out why a new bufferingPolicy:
version of makeStream(…)
is necessary, as opposed to just adding a back-pressure option to BufferingPolicy
? Is it just because the proposed design creates a kind of parallel implementation based around AsyncStream.Source
?
I expect it'll be a source of confusion to users of AsyncStream
that there are these two options, with very different APIs on the producer side.
Especially given that the proposal itself points out that the existing API isn't ideal and that this is basically implementing a new one, should it perhaps just lean fully into that and propose this new API as a replacement for the old? And therefore needing to support the existing buffer policies as well?
write(…)
behaviour
The contentsOf:
methods' documentation says:
If there is a task consuming the stream and awaiting the next element then the task will get resumed with the first element of the provided sequence.
I find this oddly confusing, even though technically it's accurate. The first couple of times I read it, it sounded like AsyncChannel
's behaviour, with no buffering inside AsyncStream
itself. I don't think it's necessary to spell out the intuitive and expected behaviour re. consumers reading items from AsyncStream
s - unless there's some subtle implication which this isn't spelling out, like the consumer executing in the producer's task or somesuch (which as far as I know is impossible, but it could still be confusing to those not completely familiar with Swift Concurrency).
I think it's more important to remind readers that write(…)
will eagerly consume elements from the given sequence into the stream's internal buffer, up to the high-water mark, only blocking [the producer] if it can't immediately fit all elements from the sequence. To help ensure producer authors don't see only one of those two behaviours in their tests and forget about the other possibility.
If the asynchronous stream already terminated then this method will throw an error indicating the failure.
What if the stream terminates while write(…)
is running, e.g. because the consumer lets the stream deinit?
Water level
The existing AsyncStream
API reports the buffer capacity remaining - this new API doesn't seem to have any equivalent functionality? It can be useful to have that available to producers so that they can e.g. log the value, for debugging.
A more independent API for this could be even more useful, such as a way to register a closure that's called whenever the buffer usage changes. This is important for cases like charting that want to see the exact time of each buffer usage change.
Water marks
Could the low & high be expressed through a RangeExpression<Int>
? That'd make it possible to have half-open settings, like:
.watermark(...10)
for an implicit low water mark of 0.
.watermark(5...)
for an unbounded buffer; a somewhat special but useful case of a stream that simply endeavours to pre-buffer at least 5 items and won't eagerly try to buffer Int.max
items but also will never block producers from writing.
It'd also allow a completely open setting, via UnboundedRange_
, which is arguably defeating the point (since it's equivalent to the existing unbounded
BufferPolicy
) but it could be useful as an option for code that [only] sometimes wants that unbounded, non-eager-producing behaviour and wants to use a single version of the AsyncStream
producer API. Could be useful in tests, for example, to make the producer-consumer interactions more deterministic. (It's also possible with the API as currently proposed anyway, by passing 0 and Int.max as the watermarks.)
It'd make sense to change the enum value to buffer
or buffered
in this case, too.
Future directions
The proposal already mentions some attractive, additional back-pressure strategies. Might be worth also suggesting a customisable one, which takes some closure or similar to evaluate if/when to prompt producers for more items given the current buffer (which they can inspect to make their decision). That could also handle the specific case of implementing a bounded-memory policy which cannot be handled generically since the effective memory footprint of an arbitrary Element
type requires custom logic to determine.
In the future, we could deprecate the current continuation based APIs since the new proposed APIs are also capable of bridging non-backpressured producers by just discarding the WriteResult
Is that strictly true; doesn't the caller also need to specify a Int.max
high water mark? Some of the write methods are async and [presumably] block if the stream's buffer is full.
Beyond that nit-pick, I do not like the idea of having two parallel APIs to AsyncStream
, as mentioned earlier. So it'd be good to have a clearer and stronger deprecation plan.