SE-0406: Backpressure support for AsyncStream

I'm not sure that's true. Users are just as likely (I think) to want to use AsyncStream for producer-consumer scenarios, for example. Stream to me says, well, a stream. If you split a literal stream into multiple forks, you don't clone the whole river, you just get subsets of it going to different places.

Users will tend to presume any given API works the way they wish it works. So in any case the documentation and API itself need to be as clear as possible about how it actually works.

Implementing a FCFS (first-come first-served) behaviour for consumers would presumably be quite a lot simpler than a broadcasting behaviour (since as you allude, broadcasting requires pre-registration or similarly more complicated APIs). You already need appropriate mutual exclusion in next w.r.t. production.

This would be a reasonable solution too. Is there a tangible timeline for those prerequisites, though?

2 Likes

So to clarify, the divergence will continue to exist, it's just that the new API specifically will be 'any' cast in both the throwing and now-throwing variants?

How would this work with transforming asynchronous sequences like map or AsyncMapSequence? Would it mean we'd need to introduce a reconsuming keyword?

1 Like

No, the new API will have the same behavior in the non-throwing and throwing stream. Furthermore, the new API will be unicast and will enforce this by fatal erroring when trying to create subsequent iterators.

These are all good questions and I have to defer to other people on this but async sequence do want to express throwing, sendable and consuming in some kind of effect polymorphism at best. However if that is doable or not is better placed whenever consumable sequences are tackled.

1 Like

I think I appreciate the need for WriteResult – to tell the writer whether it go right ahead and throw another value on the stream, or whether it needs to take a pause and wait for the consumer to prompt it to continue.

However, I don't understand why in the "hold your horses" situation: case enqueueCallback, a CallbackToken is provided, to be used with the enqueueCallback(token:onProduceMore:) method.

Could we instead have…

@frozen
public enum WriteResult: Sendable {
    /// Indicates that more elements should be produced and written to the source.
    case produceMore

    /// Indicates writing should suspend until 
    case waitUntil(prompt: () async -> ())
}

?

A producer encountering a prompt can await prompt { // write more values in here }

Thanks!

I think you are mixing two use-cases here. The proposal offers a func write(Element) async throws that offers exactly what you want. It writes the element and returns right away if there is more demand. If there is no demand it suspends the call to write and only resumes it when demand arises again or the consumer terminated.

The WriteResult based API are for producers which are not asynchronous but rather synchronous. It allows to them to write elements and in the fast path avoid the allocation for the closure. The two synchronous write APIs can be used to bridge various kinds of producers like a blocking producer or something build on top of swift-nio's EventLoopFutures.

1 Like

Hello @FranzBusch! Thank you for proposal, I will join others by heavy +1!

One question I have that I didn't find answer not in the thread neither in source code. Maybe I miss something. Let's say I have:

let (stream, source) = AsyncStream.makeStream(
    of: Int.self,
    backpressureStrategy: .watermark(low: 1_000, high: 5_000)
)

then I do something like:

var array: [Int] = .init(repeating: 0, count: 1_000_000) // Or any other way how I get 1_000_000 items

try source.write(contentsOf: array, onProduceMore: { result in
    switch result {
    case .success:
        // Trigger more production
    case .failure(let error):
        // Terminate the underlying producer
    }
})

For async write I guess there will be number of suspentions while conumer will not consume all entires and size of the stream will not grow more then 5K items.

But there is write which is not asynchronous, so there is no suspension point. Does it mean that 1M items will be actually added and onProduceMore will be clawed only when stream size will drop under low watermark?

Doc says abount high watermark:

/// When the high watermark is reached producers will be suspended. All producers will be resumed again once the low watermark is reached.

I read it like that high watermark doesn't really control the size of the stream. So my question how the stream will behave in this case? And is ther any way to fix the size of the steam that client can't add more then specified.

Sorry if it's covered somewhere in proposal our in the thread, I looked around, but didn't find the answer.

That’s a great question! So the stream will accept all the elements and add them to its buffer. It will also not call the closure you provided until its consumer has consumed below the low watermark. The proposed interface here will not drop any items. If you want that behavior you can get it by composition via the buffered algorithm from swift-async-algorithms.

We could add a strict mode to the watermark strategy that drops any element above the high watermark but I would be interested to see what benefit that has compare to the compositional approach.

2 Likes

Thanks for your reply @FranzBusch!

The use case what I keep in mind is if I would like to expose AsyncStream.Source as public API for some external developers for use. For example for communication between externally made plugin and my backend (by external I mean really external, not even from the same company/team)

If I do so, then it's quite complex to me to keep control on memory footprint since as you confirmed all 1_000_000 items will be buffered, so I as backend developer I can't prevent plugin developer to do write too much at single point in time.

So it's why some kind of strict mode is desired for me since then I can keep constant memory footprint and push external developer in right direction by using right API and think about back pressure on plugin side as well.

Regarding you suggestion of using buffered algorithms, do I understand that your proposal is to use AsyncBufferedStream in front of AsyncStream? So bounded AsyncBufferedStream is exposed as public API, then some task dispatch from buffered stream to AsyncStream? But I guess in this case I can read directly from AsyncBufferedStream in my backend, so I don't need to use AsyncStream at all? Or do I miss your proposal?

1 Like

I see. I would recommend never exposing the AsyncStream.Source nor the stream itself in any public API since it’s going to restrict your ability to evolve the API. So in your case I would create a separate type e.g. FooWriter that you pass to the third parties. This type might be backed by the streams source but you can change this at any time.

Furthermore, with your example I don’t see a need for a strict mode. You have to consume the stream that belongs to the source in your code somewhere. Wherever that is just apply a buffered(policy:) with whatever fits your use-case. This way the buffer of the stream should stay almost empty and you can control the maximum depth.

1 Like

I was thinking it could be useful to have a version of write(contentsOf:) that writes as much of the sequence as will fit below the high-water mark, and then stops. (And provides the number of consumed elements to the onProduceMore closure/callback.)

The goal being to allow the caller to write a very large number of elements (e.g. the array from @mr-swifter's comment) without blowing through the high-water mark.

I eventually convinced myself that such an algorithm can be built on top of write(contentsOf:) (e.g. using AsyncSequence.chunks(ofCount:)), so it's not needed at this level.

But I thought I'd post this, in case anyone else is having similar thoughts.

If your producer has already made excess values, refusing to accept them won't help anything. You've already paid the production cost and the memory cost.

At best you could argue it's extra hassle for the producer to handle the stream's refusal of the "excess" values which might encourage code authors to be more careful, but it might result in buggier producers too, as a result of that more complicated API. If nothing else it is less efficient, since more back-and-forth is required between the producer and the stream, with potentially more copying.

Plus, it might be the intrinsic nature of the producer that it makes values in batches of a certain or indeterminate size, and for those cases you'd then be unhelpfully penalising them as well. e.g. making a sequence of network requests where each response can contain an unpredictable number of results.

Thus I think any forceful refusals to buffer should be separate from AsyncStream, handled instead by e.g. AsyncBufferSequence only in cases where the author believes it's necessary.

A runtime warning (via oslog or similar) might be a reasonable compromise for cases where a stream is "overfed". Though if there's ultimately an API provided to be notified when the stream's buffer changes level, then that alert could easily be implemented by the user (in whatever way best suits them).

I agree with @wadetregaskis here. If the producer has already produced a large amount of elements then accepting just a few at a time is highly inefficient unless we straight drop them. We would have to slice off the provided sequence and would have to acquire the lock every time. Instead we can just accept all, buffer them and let back pressure do the rest.

In the case where one does not control the producer and wants to avoid a potential large memory footprint of the stream applying the a buffer algorithms seems like a perfect solution to me.

1 Like

Ah, that makes sense! Thank you – I feel a bit silly now as I did not look close enough :slight_smile:

That being the case, would it make sense for the async write methods to be declared earlier in Source?

1 Like

What benefit would you expect an earlier declaration to provide? Really curious because I don’t expect a lot of people to look at the interface of the standard library but rather let auto completion suggest the correct method. In an async context you should get the async write methods suggested due to how overload resolution in the compiler is implemented.

2 Likes

What benefit would you expect an earlier declaration to provide?

Just that if someone is browsing the API code then they will see the method they probably want, or are most likely interested in, declared earlier on?

… I do quite often navigate through to standard library code and look at the whole interface. I suppose I'm seeing the generated version of the interface, rather than the actual implementation code, but … I for one go looking :-)

2 Likes

7 posts were split to a new topic: Navigating HTML docs vs generated interfaces

The language steering group has decided to return this proposal for revision.

Xiaodi Wu
Review Manager

1 Like