Swift Async Algorithms: Buffer

TBH I am unsure on how to actually enforce exclusive access to pushing a value onto the buffer versus popping a value off for next without at least one actor in the system; accessing the user defined type for defining the buffer from within a lock scope is a non starter in my book.

Here is a draft of the changes but sadly some unit tests fail now due to the change.

@Philippe_Hausler could you elaborate on the « why » we cannot use a managed critical state internally to mutate the buffer safely ?

Is it because the pop operation could be time consuming because of some transformation we want to apply to produce the « Output » ?

Firstly the function is async so the resume after might be on a different thread so that would potentially do something really unsafe with the lock.

Ignoring that issue; the problem is of deadlocks - any user call out with a lock held is dangerous. The foreign code might have a side effect that blocks the same task trying to drain it (specifically the main actor is a common case for that to occur).

No matter how we cut it locking is not ideal… but we need exclusive external access to the two methods. In the end it seems to me that has to mean actor.

Ok thanks for the insight.

Now, very naïve question from me: aren’t we trying to accomplish too many things with the current implementation of the buffer algorithm? By that I mean: buffering + some kind of mapping/reducing operation that would prevent us from having a simpler implementation? (I might not have the bigger picture here :wink:)

The mapping/reducing is managed by how the adopter uses it so we don't really need any extra bookkeeping in that regards. The only real complexity added is perhaps the generics but that is relatively trivial.

Perhaps we ought to look at from the perspective of a potential use case. Something that @twittemb and I were discussing is the value of having a back pressure supporting buffer. A buffer that allows a producer to run ahead of the consumer to a pre-determined limit. This kind of buffer would be used as a smoothing mechanism to increase throughput in a back pressure supporting pipeline.

channel.buffer { ThroughputBuffer(limit: 5) }

There's a few things that can be discussed here:

  1. If programmers choose to use a buffer that increases throughput, they're concerned about performance. Adding an actor as an intermediary here will very often necessitate an actor-hop and adversely impact that throughput. It also runs against the prevailing advice which is to batch operations which are to be run on a separate actor. By their very nature, asynchronous sequences are very 'non-batch like'. Actor-hopping is slow, especially when the actors are on different executors.
  2. Using an actor doesn't actually prevent deadlocks in and of itself. As we will be providing the 'stock' buffers, the custom implementations are likely to have a level of complexity to them in any case. Enforcing that implementors use an actor may in fact create a false sense of security as we already know how confusing people find actor re-entrancy. Quickly, people will be using Checked/UnsafeContinuation to suspend callers, to be resumed later. That requires a relatively good level of understanding from the implementor. So regardless of whether an actor is used or not, the docs will need to stress that a level of care is required to do this right. (For example, by ensuring they resume all their stashed continuations.)
  3. Related to point two, if people are stashing continuations, they'll need to be a cancel() method as otherwise the suspended Tasks won't resume. I'm not sure how needing to call cancel() from an async context may complicate things, but typically the practice has been to make calling cancel synchronous so this may need to be nonisolated.

In summary, I think that if we provide 1) a good base of stock implementations, and 2) a thorough explanation in the docs that this type requires careful implementation, there should be no need to lock people down who wish to implement their own. So I agree with @FranzBusch , this should be Sendable with a mutating pop/push – and an additional cancel.

1 Like

The other very interesting thing that seems to be emerging here is a general pattern that we're seeing time and time again in these algorithms.

That is, something that conforms (or could conform) to AsyncIteratorProtocol:

public protocol AsyncIteratorProtocol {
associatedtype Element
mutating func next() async throws -> Element?

And something that is what you might call an AsyncSource with a shape similar to:

public protocol AsyncSource {
associatedtype Input
mutating func send(_ input: Input) async // maybe this should throw?
mutating func finish() // maybe this should take an Error?

So then a buffer would be:

public protocol AsyncBuffer: AsyncSource, AsyncIteratorProtocol {}

This shape appears in AsyncBuffer, it appears in the context for AsyncChannel and it also appears in AsyncRelay. I can see other potential uses for it to. So perhaps 'buffer' doesn't describe the versatility of this type.

What appears to be emerging to me is something more akin to a Rx style Subject or what's sometimes called a Processor. The key difference is that AsyncBuffer isn't an AsynchronousSequence, but an AsynchronousIteratorProtocol.

And that's a really interesting difference because when you think of an Rx Subject you know more or less that it must be multicast – at least all the stock ones are. That's a lot of baggage for an implementor of a Subject. With an AsyncBuffer there's no need to worry about that – it just spits out its next element to whoever asks. That makes it so much easier to reason about.

It also makes me wonder if an AsyncBuffer of this shape is the more natural home for the 'top' of a pipeline. I think, understandably, people get very confused why AsyncStream can't be iterated twice and I don't have a good answer for people who ask.

Creating an AsyncBuffer as the top of the pipeline, then feeding that into the init of some kind of multicasting algorithm might make more sense to people. It's clear an iterator is unicast. It also keeps the separation of concerns between source/consumption – something I've heard expressed as an important distinction that needs to be upheld.

1 Like

Just re-read the implementation and the PR that showcases a struct based approach.

I would love to understand the requirement for making the whole buffer pluggable a bit more. This requirement poses significant challenges in implementing this both without races and performant.

Maybe it is the best if we scope this down a bit and go with just a buffer algorithm that can do latest and earliest together with a depth. We can implement this without any actor and it should be very performant. In the future, we can easily extend this with a pluggable buffer.


For the sake of exploration I’m currently working on an implementation State Machine + Storage that allows to pick either an unbounded/oldest/newest policy or a suspending policy.

I’ll post a draft PR later today just to gather feedback.

[update] the storage will conform to a protocol that exposes send/finish/fail/next so we can eventually add new types of storage in the future

1 Like

Would love to see an approach like this. However, I am a bit unsure about the protocol approach. We need to make sure that the protocol is getting specialized (i.e. no existential usage).

1 Like

I’m almost ready to open a draft. Just want to implement some UT (not all) just to make sure I’m not totally wrong before requiring your time.

@FranzBusch what’s your point about the protocol not being an existantial ?

You mean that the AsyncBufferSequence should be generic over the BufferStorage protocol right?

Is the intent being able to « know » the buffering policy just by reading the type returned by « .buffer() » ? Or do you have a specific concern about using an existantial ?

An existential being of the form any AsyncBuffer<Element>. It adds an extra hit at run time, unwrapping the type. As long as AsyncBufferSequence includes the AsyncBuffer as a generic parameter, it should be specialised by the compiler at compile time. i.e. AsyncBufferSequence<Base: AsyncSequence, Buffer: AsyncBuffer>

If that happens though, I think it would be difficult to prevent people from writing their own AsyncBuffers. It pretty much becomes public API. So we'd have to be pretty sure it's correct.

Yep exactly my concern. Existentials bring a serious performance overhead which we must avoid in these algos if we want them being used in high throughput systems.

I agree here we shouldn’t make them public API. My point being here: Rather specialize the common case so that it is as performant as possible even if that means we cannot easily make it customizable.

I would love to see how such an implementation looks and performs compared to the current proposed one.


Interesting. I think that makes implementation of a buffer that handles the four cases @twittemb mentions (unbounded/oldest/newest policy or suspending) a little tougher, but maybe it's possible. Would love to see.

It should be possible to have an internal protocol that abstracts things but it gets specialized by instantiating concrete StateMachines depending on which strategy gets chosen

@Philippe_Hausler another thing that we should change the policy to be a struct with static lets instead of an enum. Otherwise we can never add a new case without breaking API.

That seems reasonable, tbh I don't think anyone needs the cases anyhow (except internally).

One other note; I was noodling about the non actor variant - and I think I came up with a fast and versatile way of accomplishing that. We can "check out" the buffer do the mutation and then "check it back in" and while that is in that state we can have the caller of the push be suspended if another access attempting to push a value comes along. Taking that approach avoids existential casts, actor hops and has a potential at inline specializations.


Interesting, yes. Would this keep the potential of the public protocol/closure API variant in play, too?

I agree. The API is also a bit clunky; the buffer passed to the AsyncSequence.buffer methods should ideally be an @autoclosure. I think we can introduce a scoped API for AsyncLimitBuffer, and when @autoclosure supports arrives for non-async functions, which is quite straightforward, we can generalize the API.