[Pitch] New APIs for Async[Throwing]Stream with backpressure support

Hi Franz

Thank you for this proposal (+1), I like that we are finally addressing the challenges of bridging synchronous world into Async Sequences when producing elements. Can you please provide more info on the back pressure strategy?
I have never heard the term watermark used in this context and as someone that would make a stream I would like to know what are good/ appropriate values for the watermark and some suggestions on how the best utilize it.

Thank you :)

1 Like

Having just written a backpressure-aware stream type for my own use, huge +1 to having this be part of the standard library. One addition that would be nice that I have in my system that is not yet outlined here is to have access to the source directly in an initializer, something like (warning: not real code, pick better names, etc…):

func dataStream() -> AsyncStream<Thing.Data> {
    AsyncStream(consuming: { source in
        let setup = await complicatedSetup()
        for thing in setup.things { // not necessarily an async sequence
            let data = await thing.computeComplicatedData()
            await source.write(data)
        }
        // Logically, the end of the stream
    }
}

Obviously omitting the entire state machine here, but it makes it trivially easy to wrap a bunch of code that kicks off right away into a sequence that can be consumed elsewhere at their discretion. For me, it was useful that this was started immediately using an unstructured Task (that inherits TaskLocals, for instance):

extension AsyncStream {
    init(consuming: (_ source: Source) async -> ()) {
        let (stream, source) = AsyncStream.makeStream(...)
        Task {
            await consuming(source)
        }
        return stream
    }
}

... but I can imagine use cases where a detached Task could be useful, though maybe that is a scenario where makeStream could give you more control.

For throwing variants, the Task would have a do {} catch {} to tell the source an error occurred and that the next next() should fail. Throwing reference implementation: GitHub

Watermarking is a common backpressure strategy used in various places such as networking. The idea is that new values should be produced until the high water mark is hit. Once the high water mark has been hit, backpressure should be exerted on the producer until enough elements have been consumed to hit the low watermark. Once that happens the producer should pickup resumption again. The idea behind watermarking is that there is a range between the low and high watermarking where production and consumption can happen in parallel without the producer flip-flopping.

Setting the right values fully depends on your use-case and what types you have in your asynchronous stream.

1 Like

The current pitch doesn't add an initializer at all but rather a factory method that returns the stream and the sequence called makeStream. You can then use the source and do whatever work you wanna kick-off right away. You can use the source via structured Concurrency or in unstructured Concurrency via Task {} and Task.detached {}

I get that, I'm just suggesting from an ergonomics point of view, it would be nice to also have an initializer that wraps it, for the case when everything is already async, it just needs some massaging to turn it into a stream. The example I shared is a simplification of a pattern I actually use, which I think many might also find useful: https://github.com/mochidev/CodableDatastore/blob/e816e10fd649ad0aeda407eeefbcc141d35d4dab/Sources/CodableDatastore/Datastore/Datastore.swift#L485-L508

Has this been implemented/made any progress?

The effort has been moved to swift-async-algorithms and recently pitched.

3 Likes

Thank you!