Swift Async Algorithms: Buffer

The other very interesting thing that seems to be emerging here is a general pattern that we're seeing time and time again in these algorithms.

That is, something that conforms (or could conform) to AsyncIteratorProtocol:

public protocol AsyncIteratorProtocol {
associatedtype Element
mutating func next() async throws -> Element?

And something that is what you might call an AsyncSource with a shape similar to:

public protocol AsyncSource {
associatedtype Input
mutating func send(_ input: Input) async // maybe this should throw?
mutating func finish() // maybe this should take an Error?

So then a buffer would be:

public protocol AsyncBuffer: AsyncSource, AsyncIteratorProtocol {}

This shape appears in AsyncBuffer, it appears in the context for AsyncChannel and it also appears in AsyncRelay. I can see other potential uses for it to. So perhaps 'buffer' doesn't describe the versatility of this type.

What appears to be emerging to me is something more akin to a Rx style Subject or what's sometimes called a Processor. The key difference is that AsyncBuffer isn't an AsynchronousSequence, but an AsynchronousIteratorProtocol.

And that's a really interesting difference because when you think of an Rx Subject you know more or less that it must be multicast – at least all the stock ones are. That's a lot of baggage for an implementor of a Subject. With an AsyncBuffer there's no need to worry about that – it just spits out its next element to whoever asks. That makes it so much easier to reason about.

It also makes me wonder if an AsyncBuffer of this shape is the more natural home for the 'top' of a pipeline. I think, understandably, people get very confused why AsyncStream can't be iterated twice and I don't have a good answer for people who ask.

Creating an AsyncBuffer as the top of the pipeline, then feeding that into the init of some kind of multicasting algorithm might make more sense to people. It's clear an iterator is unicast. It also keeps the separation of concerns between source/consumption – something I've heard expressed as an important distinction that needs to be upheld.

1 Like

Just re-read the implementation and the PR that showcases a struct based approach.

I would love to understand the requirement for making the whole buffer pluggable a bit more. This requirement poses significant challenges in implementing this both without races and performant.

Maybe it is the best if we scope this down a bit and go with just a buffer algorithm that can do latest and earliest together with a depth. We can implement this without any actor and it should be very performant. In the future, we can easily extend this with a pluggable buffer.


For the sake of exploration I’m currently working on an implementation State Machine + Storage that allows to pick either an unbounded/oldest/newest policy or a suspending policy.

I’ll post a draft PR later today just to gather feedback.

[update] the storage will conform to a protocol that exposes send/finish/fail/next so we can eventually add new types of storage in the future

1 Like

Would love to see an approach like this. However, I am a bit unsure about the protocol approach. We need to make sure that the protocol is getting specialized (i.e. no existential usage).

1 Like

I’m almost ready to open a draft. Just want to implement some UT (not all) just to make sure I’m not totally wrong before requiring your time.

@FranzBusch what’s your point about the protocol not being an existantial ?

You mean that the AsyncBufferSequence should be generic over the BufferStorage protocol right?

Is the intent being able to « know » the buffering policy just by reading the type returned by « .buffer() » ? Or do you have a specific concern about using an existantial ?

An existential being of the form any AsyncBuffer<Element>. It adds an extra hit at run time, unwrapping the type. As long as AsyncBufferSequence includes the AsyncBuffer as a generic parameter, it should be specialised by the compiler at compile time. i.e. AsyncBufferSequence<Base: AsyncSequence, Buffer: AsyncBuffer>

If that happens though, I think it would be difficult to prevent people from writing their own AsyncBuffers. It pretty much becomes public API. So we'd have to be pretty sure it's correct.

Yep exactly my concern. Existentials bring a serious performance overhead which we must avoid in these algos if we want them being used in high throughput systems.

I agree here we shouldn’t make them public API. My point being here: Rather specialize the common case so that it is as performant as possible even if that means we cannot easily make it customizable.

I would love to see how such an implementation looks and performs compared to the current proposed one.


Interesting. I think that makes implementation of a buffer that handles the four cases @twittemb mentions (unbounded/oldest/newest policy or suspending) a little tougher, but maybe it's possible. Would love to see.

It should be possible to have an internal protocol that abstracts things but it gets specialized by instantiating concrete StateMachines depending on which strategy gets chosen

@Philippe_Hausler another thing that we should change the policy to be a struct with static lets instead of an enum. Otherwise we can never add a new case without breaking API.

That seems reasonable, tbh I don't think anyone needs the cases anyhow (except internally).

One other note; I was noodling about the non actor variant - and I think I came up with a fast and versatile way of accomplishing that. We can "check out" the buffer do the mutation and then "check it back in" and while that is in that state we can have the caller of the push be suspended if another access attempting to push a value comes along. Taking that approach avoids existential casts, actor hops and has a potential at inline specializations.


Interesting, yes. Would this keep the potential of the public protocol/closure API variant in play, too?

I agree. The API is also a bit clunky; the buffer passed to the AsyncSequence.buffer methods should ideally be an @autoclosure. I think we can introduce a scoped API for AsyncLimitBuffer, and when @autoclosure supports arrives for non-async functions, which is quite straightforward, we can generalize the API.


here's my attempt to not use actors -> buffer: implement with custom storages by twittemb · Pull Request #239 · apple/swift-async-algorithms · GitHub

Hi @twittemb, this looks interesting. One thing I'm not sure about is the BufferStorage protocol. My personal feeling is that if the protocol is exposed publicly, the type may as well just accept it as a generic parameter. That would allowing using a regular protocol 'style', rather than the 'functional hooks' one you have here, which is sort of like a type eraser. Would be interesting to know the performance difference, too – if any.

But maybe let's see what @Philippe_Hausler and @FranzBusch are thinking, first!

I have only briefly looked over the PR but I agree that I would not expose the protocol publicly at all. Furthermore, you are right with the closures. They can also pose a significant performance impact. I would have to look at what the compiler optimizes here but my gut feeling is that performance will be better with concrete types that can get inlined

There is the option of specialization hinting; which can definitely be applied here.

Perhaps with the concerns of performance we should codify some benchmarks for comparison. If using the protocol is a huge hit then I think we can loop in the compiler folks that deal with optimization; my vote is to shape the API correctly and then figure out perf from there. The interface and ABI are first principles, performance is important but only takes second place.


I agree with this, but I am unsure if the API needs to involve a protocol at all.

Do you think it is crucial to have a pluggable buffer here?
Maybe we should separate the common cases from the pluggable one?


I've worked on variations of the implementation according to your comments:

From what I can see, there is no big winner in terms of performances.

1 Like