SE-0406: Backpressure support for AsyncStream

Hello Swift community,

The review of SE-0406: Backpressure support for AsyncStream begins now and runs through August 28, 2023.

Reviews are an important part of the Swift evolution process. All review feedback should be either on this forum thread or, if you would like to keep your feedback private, directly to me as the review manager via the forum messaging feature. When contacting the review manager directly, please put "SE-0406" in the subject line.

Trying it out

If you'd like to try this proposal out, you can download a toolchain supporting it for macOS or for Linux.

What goes into a review?

The goal of the review process is to improve the proposal under review through constructive criticism and, eventually, determine the direction of Swift. When writing your review, here are some questions you might want to answer in your review:

  • What is your evaluation of the proposal?
  • Is the problem being addressed significant enough to warrant a change to Swift?
  • Does this proposal fit well with the feel and direction of Swift?
  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?
  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?

More information about the Swift evolution process is available at

https://github.com/apple/swift-evolution/blob/main/process.md

Thank you,

Xiaodi Wu
Review Manager

20 Likes

Back-pressure support in particular will be very helpful. It's currently hard to use AsyncStream for many types of real-world tasks because it can easily run off into pathological cases (e.g. enumerating a folder hierarchy to a processing task that's slow[er], resulting in unbounded memory growth within the buffer). Worse I fear some people using it currently don't understand these risks, and are producing poorly-behaved applications as a result.

So big +1 on improvements in this area.

Versus AsyncChannel

To me, AsyncChannel is rarely useful in practice because it doesn't buffer values - it's essentially just an elaborate way to move elements one at a time between two tasks (or to divide work between tasks, for which there's simpler existing alternatives like DispatchQueue.concurrentPerform(iterations:execute:)).

However, AsyncChannel will now be increasingly similar to AsyncStream, with this change. Confusingly similar, I fear.

AsyncChannel seems to still have some superior attributes even given this proposal, such as explicit multi-user support and well-defined consumer order.

I don't know which one makes the best starting point, but is there a reason that one of these two cannot be made a functional superset of the other, allowing the other to be deprecated?

Callback token

Can you elaborate on why the callback token API is necessary? The native try await source.write(contentsOf: sequence) is clearly the best option, and I see the utility of try source.write(contentsOf:onProduceMore:) for interfacing with non-Swift-Concurrency code, but the callback token bit seems odd.

It also seems to open up an error case that the other two don't: what if the producer receives the enqueueCallback result but fails to enqueue the callback?

Buffering vs back-pressure

Can you spell out why a new bufferingPolicy: version of makeStream(…) is necessary, as opposed to just adding a back-pressure option to BufferingPolicy? Is it just because the proposed design creates a kind of parallel implementation based around AsyncStream.Source?

I expect it'll be a source of confusion to users of AsyncStream that there are these two options, with very different APIs on the producer side.

Especially given that the proposal itself points out that the existing API isn't ideal and that this is basically implementing a new one, should it perhaps just lean fully into that and propose this new API as a replacement for the old? And therefore needing to support the existing buffer policies as well?

write(…) behaviour

The contentsOf: methods' documentation says:

If there is a task consuming the stream and awaiting the next element then the task will get resumed with the first element of the provided sequence.

I find this oddly confusing, even though technically it's accurate. The first couple of times I read it, it sounded like AsyncChannel's behaviour, with no buffering inside AsyncStream itself. I don't think it's necessary to spell out the intuitive and expected behaviour re. consumers reading items from AsyncStreams - unless there's some subtle implication which this isn't spelling out, like the consumer executing in the producer's task or somesuch (which as far as I know is impossible, but it could still be confusing to those not completely familiar with Swift Concurrency).

I think it's more important to remind readers that write(…) will eagerly consume elements from the given sequence into the stream's internal buffer, up to the high-water mark, only blocking [the producer] if it can't immediately fit all elements from the sequence. To help ensure producer authors don't see only one of those two behaviours in their tests and forget about the other possibility.

If the asynchronous stream already terminated then this method will throw an error indicating the failure.

What if the stream terminates while write(…) is running, e.g. because the consumer lets the stream deinit?

Water level

The existing AsyncStream API reports the buffer capacity remaining - this new API doesn't seem to have any equivalent functionality? It can be useful to have that available to producers so that they can e.g. log the value, for debugging.

A more independent API for this could be even more useful, such as a way to register a closure that's called whenever the buffer usage changes. This is important for cases like charting that want to see the exact time of each buffer usage change.

Water marks

Could the low & high be expressed through a RangeExpression<Int>? That'd make it possible to have half-open settings, like:

  • .watermark(...10) for an implicit low water mark of 0.
  • .watermark(5...) for an unbounded buffer; a somewhat special but useful case of a stream that simply endeavours to pre-buffer at least 5 items and won't eagerly try to buffer Int.max items but also will never block producers from writing.

It'd also allow a completely open setting, via UnboundedRange_, which is arguably defeating the point (since it's equivalent to the existing unbounded BufferPolicy) but it could be useful as an option for code that [only] sometimes wants that unbounded, non-eager-producing behaviour and wants to use a single version of the AsyncStream producer API. Could be useful in tests, for example, to make the producer-consumer interactions more deterministic. (It's also possible with the API as currently proposed anyway, by passing 0 and Int.max as the watermarks.)

It'd make sense to change the enum value to buffer or buffered in this case, too.

Future directions

The proposal already mentions some attractive, additional back-pressure strategies. Might be worth also suggesting a customisable one, which takes some closure or similar to evaluate if/when to prompt producers for more items given the current buffer (which they can inspect to make their decision). That could also handle the specific case of implementing a bounded-memory policy which cannot be handled generically since the effective memory footprint of an arbitrary Element type requires custom logic to determine.

In the future, we could deprecate the current continuation based APIs since the new proposed APIs are also capable of bridging non-backpressured producers by just discarding the WriteResult

Is that strictly true; doesn't the caller also need to specify a Int.max high water mark? Some of the write methods are async and [presumably] block if the stream's buffer is full.

Beyond that nit-pick, I do not like the idea of having two parallel APIs to AsyncStream, as mentioned earlier. So it'd be good to have a clearer and stronger deprecation plan.

8 Likes

First off, thanks for the detailed and great feedback!

AsyncChannel has one difference compared to the proposed AsyncStream interfaces here in that it is an anycast (multi-producer/multi-consumer) asynchronous sequence. It supports multiple producers getting queued up like the new interfaces here but it also supports multiple consumers queuing and will deliver the next value to the first consumer in the Deque (FIFO). There is a value to this when you want to communicate across more than two tasks but I agree a majority of its usefulness is subsumed by this new proposal.

There are two reasons for this callback token based API. Firstly, it is the building block of the other two and it is a great abstraction layer. Though more importantly and motivating factor this should be public API is that this has significant performance gains over the callback based one. It allows to avoid an allocation of the closure in the fast case where we still have more demand and should produce more right after calling write. With the callback based API you would have to allocate a closure every time even if it is invoked right away.

That's okay. There is no state handling that expects the user to enqueue a callback. It is totally valid to just call finish as a result of enqueueCallback or just produce more and ignore the whole backpressure for systems that aren't backpressured themselves.

With the current types of Continuation and YieldResult we cannot spell a backpressure aware API at all. That's why I opted for creating a totally new type (Source) and a new makeStream(backpressureStrategy:) factory method so that there is no confusion about the methods on a type. I wanted to avoid having an overloaded Continuation type that has yield and write methods where one could interleave their usage.

I haven't fully convinced myself that the old APIs have no value at all once the new ones exist. There are cases where your underlying system doesn't have backpressure e.g. a delegate that you bridge into an AsyncStream. In those cases, you can just use the new APIs but and ignore the WriteResult; however, there is no maximum buffer limit anymore. You can simply use the buffer(policy:) APIs from swift-async-algos to add this though. The only thing missing is that you do not get callbacks about dropped elements in this case.
Personally, I think it is best if we keep both APIs and see how they are used over the next year or so and then revisit it.

Thanks for the documentation feedback! Once the review period is over I will make sure to incorporate this!

That's not possible. The stream uses a lock to protect its internal state. If you write new values to the source, we acquire the lock, check if we are terminated. If at the same time the consuming task terminates then it will wait for the lock to be unlocked and only then the state will transition to finished.

I opted to not expose the buffer level intentionally because even with the current APIs this is very racy. The moment you get the YieldResult the buffer might have already changed completely since the consumption is happening on a different thread. Introducing an API that get's called on every write and next call is very dangerous since it must run while holding the lock to produce correct values but opens it up to extreme performance degradation when a user does long running work in the closure.

I think the overall idea of debugging capabilities is good but IMO should come after adding those new APIs and we must heavily think about how this will impact performance. Furthermore, the current APIs and newly proposed ones don't have any inlinability which is something that I would like to see further down the line once we have first class locks and a Deque in the stdlib. This would allow us to extract an enormous amount of performance from AsyncStream as well.

We could express those with a ClosedRange. I am happy to hear what others think about this. It would remove one of the preconditions where we currently check that the high > low.

This can be achieved by just chaining a buffered(policy: .unbounded) behind your AsyncStream .

Same as the debugging capabilities above. I intentionally did not make the backpressure strategy a protocol that you can inject but rather offer a few built-in ones for a few reasons. Firstly, this is problematic because the strategy should be almost stateless and must be fast. If we allow the user to inject them we allow them to do arbitrary computation in a very time critical context. We have done this in NIO but if I would design it again I would not make this as customisable. Additionally, from what we saw in NIO and in other ecosystems such as Akka the watermark strategy is almost the only strategy used. Akka doesn't even provide a different once as far as I know. I would like to start of with the watermark strategy and then see how the ecosystem uses it and if there are concrete examples where we want a different one. If in the end we never introduce another strategy then that's okay in my opinion.

He doesn't. This is where the WriteResult based API comes into play. It allows you to fully ignore the WriteResult and just continue on producing. You can use this in async contexts as well by just doing this little dance to avoid the favoured overload resolution to the async write method.

func someAsyncMethod(source: AsyncStream<Int>.Source) async throws {
    _ = try { try source.write(1) }()
    _ = try { try source.write(1) }()
}
3 Likes

(disclaimer: I was heavily involved in the problem statement and design of this, so I may be biased)

  • What is your evaluation of the proposal?

To me, this is a must have, therefore strong +1.

  • Is the problem being addressed significant enough to warrant a change to Swift?

Yes. Currently (as in before this proposal is implemented), AsyncStream is quite dangerous to use because it doesn't enforce backpressure (unless you configure it drop elements which is often not acceptable). Especially in server use cases, if Async(Throwing)Stream were used with data fed from the network, it usually means a Denial of Service security vulnerability. This is because it often allows the attacker to feed data faster than it's consumed which means it balloons into the server's memory (--> lack of back pressure). Therefore, until now I cautioned people touching Async(Throwing)Stream unless they could prove ahead of time that the data fed into the stream is of a small, finite size that is not attacker controlled. This proposal fixes this issue and therefore addresses my main concern.

It also addresses shortcomings in consumer-to-producer as well as producer-to-consumer cancellation (called "termination" in the proposal) support that are crucial for most AsyncSequences. That's also necessary because if cancellation doesn't work both ways it's pretty easy to have a bunch of Async(Throwing)Streams lying around that will never make progress.

  • Does this proposal fit well with the feel and direction of Swift?

Yes, Swift is a safe language, so we shouldn't have stdlib types that with default usage create security vulnerabilities. This proposal fixes that.

  • If you have used other languages or libraries with a similar feature, how do you feel that this proposal compares to those?

Most serious asynchronous server libraries do backpressure. The ones that don't are in deep trouble. So I think this fits well.

  • How much effort did you put into your review? A glance, a quick reading, or an in-depth study?

Involved in the design of this.

Thanks so much for polishing, finishing and writing all this up @FranzBusch.

5 Likes

Right, but, is there a reason AsyncStream can't support this too? Many classical (and common) patterns involve multiple consumers, e.g. fanning incoming requests out to a worker pool.

Especially if that'll be the only remaining difference, it seems so close as to be weird not to just shoot for that. Unless, of course, there are significant implementation challenges or trade-offs in supporting that?

But just to be completely clear, if the producer doesn't enqueue the callback and doesn't otherwise blindly produce more, then they'll hang forever? Nothing further from AsyncStream will 'kick' them into action again?

That would be a bug in the producer, to be sure - I'm just trying to fully understand the potential for user error on this API.

Composing with AsyncBufferSequence makes sense. Adding notifications for dropped elements seems like a useful addition to AsyncBufferSequence (orthogonal to this proposal).

Only if there's nothing forward-incompatible about the new APIs if it's ultimately decided to deprecate the old ones. e.g. the Source API doesn't report the buffer capacity back to the producer; if existing AsyncStream users are relying on this, they can't use the Source API, and adding that afterwards to the Source API might be awkward (yet more overloads of write?).

Not every use-case for AsyncStream is that sensitive to performance. Those that are can elect to not use any optional aspect, like buffer usage reporting, that is deemed too expensive for them.

There are also ways to implement this that don't happen synchronously [w.r.t. the buffer]. e.g. report (timestamp, buffer usage) asynchronously (but in-order) without any locks held, which is fine for monitoring purposes. Especially if we rule out the notion of using the buffer usage level to influence the behaviour of the stream.

Nonetheless this does seem like something that can be added at any later time without impacting the existing API (other than perhaps a new initialiser to take a callback closure); I'm not saying it has to be in this proposal. I just wanted to point out that it is a potential future direction.

Not necessarily a performance concern, and where it is those use-cases can either use a built-in, known-fast strategy, or ensure their own strategy is implemented performantly (whatever that means for them).

AsyncStream is used for a lot more things than cases like NIO, many of which are essentially not performance-sensitive in this context. I'm all for foolproof APIs, but sometimes it's better to trade some performance here for better performance in other respects (e.g. ensuring lower average memory use through better need prediction).

I agree it's wise to not be eager here w.r.t. potential back-pressure strategies. I'm not advocating for anything more than the proposed low+high watermark system in the initial implementation. I just think it worth acknowledging that there are a diverse range of buffering strategies, including those that cannot possibly be satisfied through isolated built-ins (e.g. RAM-size-constrained buffering of arbitrary types of elements).

Why use Result<Void, Error> and not just Optional<Error>?

Technically equivalent, but Result is semantically more appropriate. It's clearer about which is the good path vs the bad (Void vs Error), whereas Optional is more ambiguous (it's not necessarily for error handling to begin with, and even if it is used that way, typically nil is the error case).

Also, in practice, lots of code exists to work with Result in a compatible manner, and Result offers more purpose-built APIs for working with the errors (e.g. mapError & flatMapError).

4 Likes

this objection strikes me as odd, given that AsyncChannel is a third-party package type, while AsyncStream is a standard library type. sure, AsyncChannel has a lot of users, in the way that jquery or boost have a lot of users. but it doesn't make sense to me to try and design the standard library around coherence with someone's third-party extension.

The reason why AsyncStream is not aiming to become multi-producer/multi-consumer is twofold. First, supporting multiple consumers makes the implementation a ton more complicated. Secondly, the question now comes up what multicast flavour AsyncStream should have. Should it broadcast all values to all consumers or should it distribute the values in FIFO order to the consumers or maybe should it round robin?
You see where I am hinting towards. All of those are desirable patterns depending on the situation. I think we should make AsyncStream a fast and correct unicast asynchronous sequence and leave the multicasting to an algorithm. We do have plans to add a broadcast and balance algorithm to swift-async-aglorithms.

Correct. If they retain the Source and just don't do anything the stream will not get any values nor will it finish (this is the same as the Continuation works right now). We could never terminate here because maybe there is just a looooong pause of no values.

I am fully aware that NIO is the minority use-case here probably. However, standard library types have a high bar to include APIs and we want to have those types be as highly performant as possible. As you said, let's start with the watermark strategy here and then see where we are in a year from now.

2 Likes

I don't have anything interesting to add, but I wanted to say that I am very excited about this!

It feels very thought through and balanced to me, and - from a server-side perspective - I can see how this can be a core building block that enables very concise, safe, and correct-by-default data flow in applications (especially with the NIO<->async bridging stuff).

Once these holes are patched (and things like broadcast and other "operators" are built on top) I feel we can have a Combine-like feature set without having to fully subscribe to some RX-ish library or programming-style.

I always miss "normal code" when going into rx/combine land, so this feels like the best of both worlds to me ; )

3 Likes

AsyncChannel, part of swift-async-algorithms, is not 3rd party. It's owned by Apple, and mostly written by Apple (@Philippe_Hausler & @kperryua primarily) but has contributions from a healthy number of community members.

In any case, I tend to think of swift-async-algorithms as just an extension of the formal Swift standard libs, for stuff that's not sufficiently foundational (for all Swift developers) to make it into Swift stdlib or Foundation (yet?), but nonetheless a toolbox of very useful tools. Just like many other of Apple's "non-core" libraries like swift-collections, swift-algorithms, swift-log, swift-argument-parser, swift-crypto, etc. On that basis, it doesn't make sense to me to have confusing or unnecessary duplication of functionality.

Tangent: Apple has way more open-source packages than you probably think - 228 repos right now. Looking through there, there's a tremendous amount of really important libraries in there (for the Swift ecosystem).

SAA is absolutely third-party; it does not ship with the toolchain, and it is independently versioned. that there is a lot of overlap among the contributors of that package and the toolchain is irrelevant.

here are some examples of first party libraries:

  • toolchain modules such as Regex
  • things that ship with the toolchain, such as SPM
  • things are are versioned and release in lockstep with the toolchain, such as swift-syntax.

the line can be a little blurry. for example, swift-markdown is supposed to version itself against the toolchain, but it doesn’t ship with the toolchain, and in my opinion there is also not a lot of reason for that package to synchronize its releases with the toolchain in the first place. another example is swift-syntax which has been first-party for a long time but is gradually moving towards a third-party distribution model. but independently-versioned packages like SAA or swift-collections are not first-party libraries by any means, they are far beyond the gray area between “first” and “third”.

keep in mind that first-party vs. third-party is purely a versioning-and-distribution concept, it is not an indicator of library quality or library impact. its only significance is that things in a first-party distribution group share a versioning domain, and it only makes sense to think about library cohesion within a single versioning domain.

1 Like

That makes sense although I will point out that those same operations are available on Optional with map and flatMap.

Out of context, but...

There is an interesting dichotomy here though:

Result<Never, Never> == Never
Optional<Never> == Void
Result<Void, T> == Optional<T>
Result<Void, Never> == Void

I do think that the fact that Error is the payload of the optional would signify that nil is the success case, and this is an established pattern in callbacks.

func doSomething(completion: (Error?) -> Void)) 

I know Result was created to move away from these kinds of completion handlers, but for one that only has an error possibility optional still works fine

1 Like

Good point. These could be parameterisations of AsyncStream, but I can see the argument that it'd be better to componentise them (as swift-async-algorithms does).

My lingering concern is the sharp edge that is multiple consumers - nothing is stopping someone from trying to do that with AsyncStream. I like things to be gracefully resilient to programmer error in proportion to how close to the stdlib they are. Since almost everyone uses stdlib & Foundation, there's a wide range of competencies (and experience levels). For that reason, it'd be nice if multiple consumers were reasonably supported even if the behaviour isn't configurable or ideal for every use case (or even any use case - even String has some parts with underwhelming performance, but overall it works well-enough for most folks).

To be clear, I respect your position here. This isn't a deal-breaker for me in terms of supporting the proposal. For whatever my vote is worth anyway. :slightly_smiling_face:

Totally agree. They should make some sacrifices, where necessary, to ensure they can be used in very performance-sensitive cases for their essential purpose. But they shouldn't have to always be maximally performant no matter how you configure or use them - optional behaviour can have worth-while trade-offs.

Anyway, these other potential future directions of course don't have to be included in the proposal. I just wanted to put them out there for folks to consider. Perhaps it was premature to bring them up as part of this proposal's review - consider it a form of flattery that I'm already excited about next steps. :grin:

Ah, a semantics issue, is all this seems to be. I was using "first party" vs "third party" in their conventional senses, i.e. "a third person or organization less directly involved in a matter than the main people or organizations that are involved". You're using it (if I now understand correctly) to mean whether or not something ships with e.g. Xcode. Or something closer to that, anyway. Which is fine.

But by either meaning my point still stands - swift-async-algorithms is a high-profile library that (I assume) a lot of people use (or should). Especially since it is mostly controlled by Apple, and at least some of this proposal's authors are significant contributors to it, I think it'd be great and is within reach to avoid unnecessary redundancy or confusion.

well, the thing about third-party libraries is that they live on a completely different timeline from all the first-party stuff we’ve been talking about. what this means for us specifically is that SAA can release a new version at any time and invalidate all the assumptions you’ve been using so far to guide the design of the first-party stuff. now, SAA might be unlikely to do so, given how widely-used it is. but my point is that it could, because that’s what it means to be a third-party library - you can switch things up at any time, whereas the standard library cannot.

another way to look at it is that if we design AsyncStream specifically so that it does not overlap with AsyncChannel, then that is in a way equivalent to saying that SAA cannot ever deprecate or replace AsyncChannel, and we don’t want to rely on, or more importantly, expect third-party libraries not to change.

2 Likes

Firstly, I think this will be a big improvement on the current API, so generally, as an advancement for. AsyncStream I'm supportive. I'm still not quite sure from the proposal exactly how multiple consumers are handled though:

  • Supports multiple consumers even though proposal positions it as a unicast asynchronous sequence

More generally though, zooming out and looking at AsyncSequence source types as a whole, my feeling is that the current direction violates the 'progressive disclosure' spirit of Swift.

Specifically, I think it is both inconsistent and unintuitive that 'source' asynchronous sequences within the standard library are either a) single-shot (can only be iterated once) or b) 'any' cast (distribute elements to downstream producers randomly) by default.

I've been racking my brains for any standard library API that vends a Collection which must only be iterated once – and I can't think of any. In some ways, an unintentional 'any' cast asynchronous sequence is worse than one which fatal errors because you may not catch a consumer missing elements.

There's no reason it has to be this way.

Rx derivative libraries were faced with exactly the same issue, and they made their stock Source types (Subjects) multicast by default. There's nothing in the specification for a Subject that says they must be multicast, but in the interest of progressive disclosure, they seem to have decided that this was the most intuitive way forward. It's just now we take for granted that Subjects are multicast. (You can create a single-shot Subject if you need one for performance.)

Just as there's no reason you can't make a one-shot Collection that fatal errors when you attempt to iterate it twice, there's no reason you shouldn't be able to create single-shot (or 'any' cast) asynchronous sequences that are optimised for performance.

But I question their inclusion in the standard lib.

4 Likes

I think that the back pressure parte of the proposal is needed, +1, thanks for your hard work! I'd just add more simpler back pressure strategy because, as it has been stated, the watermark is good for server context, but maybe unnecessary for more simple tasks

Regarding the whole proposal, I'd like more clarifications, as other commenters has said before me, regarding the multicast limitations

Currently the missing of a real "replacement" of combine pattern in async context is creating for me multiple api-design problems, I'm restraining usage of asyncsequence in our company projects because for UI interactions the subscribers/producer pattern is really important

I was hopeful in the last months for asyncstream love touches, like the backpressure support , but the limitations on multiple subscribers its a bit of a bummer

2 Likes

Replying to both concerns raised with multiple consumer support. My goal with this proposal was to clarify the behaviour of the current Async[Throwing]Stream since the current implementation has diverging behavior between the throwing and non-throwing one. Furthermore, none of the two provide a true broadcast API which I think is what most users expect.

I do agree that there is a significant need for broadcasting of AsyncSequences because that is a very common pattern in applications; however, I am still of the opinion that this should be composed rather than baked into AsyncStream itself. Two reasons for this: First, implementing an efficient broadcasting algorithm is hard and would make the implementation of AsyncStream significantly more complex. Secondly, when our root asynchronous sequence type is always broadcasting it limits some of it's usability in scenarios where one does not want a broadcasted sequence.

Lastly, there is an interesting future direction here that we haven't discussed yet. Once ~Copyable and ~Escaping types land we have to revisit the sequence and asynchronous sequence protocols. This would allow us to spell out a type where the func makeAsyncIterator() becomes consuming and would enforce the unicast behaviour at compile time compared to the proposals runtime enforcement.

10 Likes

This is a very interesting future direction for streams!

I see, I cannot disagree on this approach, but still a real implementation of this approach on async-algorithms, with extensive documentation, is needed asap, would be really useful

7 Likes