Thank you for this proposal (+1), I like that we are finally addressing the challenges of bridging synchronous world into Async Sequences when producing elements. Can you please provide more info on the back pressure strategy?
I have never heard the term watermark used in this context and as someone that would make a stream I would like to know what are good/ appropriate values for the watermark and some suggestions on how the best utilize it.
Having just written a backpressure-aware stream type for my own use, huge +1 to having this be part of the standard library. One addition that would be nice that I have in my system that is not yet outlined here is to have access to the source directly in an initializer, something like (warning: not real code, pick better names, etc…):
func dataStream() -> AsyncStream<Thing.Data> {
AsyncStream(consuming: { source in
let setup = await complicatedSetup()
for thing in setup.things { // not necessarily an async sequence
let data = await thing.computeComplicatedData()
await source.write(data)
}
// Logically, the end of the stream
}
}
Obviously omitting the entire state machine here, but it makes it trivially easy to wrap a bunch of code that kicks off right away into a sequence that can be consumed elsewhere at their discretion. For me, it was useful that this was started immediately using an unstructured Task (that inherits TaskLocals, for instance):
... but I can imagine use cases where a detached Task could be useful, though maybe that is a scenario where makeStream could give you more control.
For throwing variants, the Task would have a do {} catch {} to tell the source an error occurred and that the next next() should fail. Throwing reference implementation: GitHub
Watermarking is a common backpressure strategy used in various places such as networking. The idea is that new values should be produced until the high water mark is hit. Once the high water mark has been hit, backpressure should be exerted on the producer until enough elements have been consumed to hit the low watermark. Once that happens the producer should pickup resumption again. The idea behind watermarking is that there is a range between the low and high watermarking where production and consumption can happen in parallel without the producer flip-flopping.
Setting the right values fully depends on your use-case and what types you have in your asynchronous stream.
The current pitch doesn't add an initializer at all but rather a factory method that returns the stream and the sequence called makeStream. You can then use the source and do whatever work you wanna kick-off right away. You can use the source via structured Concurrency or in unstructured Concurrency via Task {} and Task.detached {}