Swift Async Algorithms: Buffer

Buffer

Introduction

Buffering is a common mechanism to smooth out demand to account for production that may be able to emit more quickly at some times than the requests come in for values. AsyncStream accomplishes this by offering control over the size of the buffer and the policy in which to deal with values after they exceed that size. That strategy works for many common cases, however it does not work for all scenarios and does not offer a mechanism to adapt other AsyncSequence types to have the buffering property. This proposal aims to offer a type that solves those more advanced cases but still expose it as a simple and approachable interface that is safe to use.

Motivation

There are cases in which buffers can be more aptly implemented via coalescing, or can be more efficiently stored in specialized data structures for the element payloads. Granting advanced control over the way that buffers store values but still providing the safety of how to manipulate those buffers is key to making robust buffering tools for developers.

In this proposal we will use an example of a dungeon game navigation as a stand in for these types of interactions; movement events will take the form of a number of moves north, south, east or west. The movements in this example start off as an AsyncSequence and subsequently iterated to derive additional state for the game, however if the iteration of those events is not as fast as the production then the values must be buffered to avoid dropping events. This contrived example is a stand-in for other more complicated systems and can be used to illustrate the mechanisms required for buffering.

The game example has a move enumeration that has an associated values for directions and combined directions and associated payloads to provide scalar amounts per those directions and a function to add an additional move to produce a sum of the movement vector. Moving east by 2 and then moving west by 1 sums to a movement of east by 1 etc.

enum Move {
	case north(Int)
	case northWest(Int, Int)
	case west(Int)
	case southWest(Int, Int)
	case south(Int)
	case southEast(Int, Int)
	case east(Int)
	case northEast(Int, Int)
	
	func add(_ other: Move) -> Move { ... }
}

This means that when we push additional values into the buffer we only need to store one value for the total movement and a normal array or deque storage would not be the most efficient method to store things. This of course becomes more pertanent when the values being stored are considerably more complex than just a movement enumeration.

However the other part to the complexity is the management of when to push values into the buffer and when to pop values out as well as maintaining concurrency safe state of those values. This is precisely where the buffer algorithm comes in to offer a simple interface for doing so by leveraging the language features for concurrency to ensure safe access.

Proposed Solution

The buffer algorithm comes in a few distinct parts; the algorithm for buffering values, a definition of how to manipulate those values safely, and a common use case scenario based on normal collection based storage.

At the heart of the buffer algorithm is a definition of how to asynchronously store values and how to remove values from that storage. It is not required for those values to be the same type; it is reasonable in more advanced cases for the elements being pushed into the buffer to be differing from the elements being popped off. In the example of the moves for the dungeon game it could be reasonable to push individual Move elements into the buffer but then pop arrays of Move elements off. Furthermore, in some cases it is meaningful to describe failures of the pop function to indicate that some condition has created a failure mode. This means that the buffer type needs to have flexibility for both its output independent to its input as well as being able to be a source of rethrowing failures. Since this buffer is intended for asynchronous operation it means that all values must be manipulated in their own island of concurrency; pushing and popping values must share the same access isolation together. Putting this all together means that the AsyncBuffer definition then is a protocol requiring being an actor with associated types for output and input and requirements for pushing and popping such that the pop method may contribute to the throwing nature of the resulting AsyncSequence.

/// An asynchronous buffer storage actor protocol used for buffering
/// elements to an `AsyncBufferSequence`.
@rethrows
public protocol AsyncBuffer: Actor {
  associatedtype Input: Sendable
  associatedtype Output: Sendable

  /// Push an element to enqueue to the buffer
  func push(_ element: Input) async
  
  /// Pop an element from the buffer.
  ///
  /// Implementors of `pop()` may throw. In cases where types
  /// throw from this function, that throwing behavior contributes to
  /// the rethrowing characteristics of `AsyncBufferSequence`.
  func pop() async throws -> Output?
}

One of the most common buffering strategies is to buffer into a mutable collection and limit by a policy of how to handle elements, this limit can be either unbounded, buffering oldest or buffering newest values. This takes a inspiration directly from AsyncStream and grants developers a mechanism for replicating the buffering strategies for AsyncStream on any AsyncSequence.

public actor AsyncLimitBuffer<Element: Sendable>: AsyncBuffer {
  /// A policy for buffering elements to an `AsyncLimitBuffer`
  public enum Policy: Sendable {
    /// A policy for no bounding limit of pushed elements.
    case unbounded
    /// A policy for limiting to a specific number of oldest values.
    case bufferingOldest(Int)
    /// A policy for limiting to a specific number of newest values.
    case bufferingNewest(Int)
  }
  
  public func push(_ element: Element) async 
  public func pop() async -> Element?
}

Putting those together with an AsyncSequence then grants a more general form for specifying how to create a given buffer and then a more specific version that specifies a limit policy.

extension AsyncSequence where Element: Sendable, Self: Sendable {
  /// Creates an asynchronous sequence that buffers elements using a buffer created from a supplied closure.
  ///
  /// Use the `buffer(_:)` method to account for `AsyncSequence` types that may produce elements faster
  /// than they are iterated. The `createBuffer` closure returns a backing buffer for storing elements and dealing with
  /// behavioral characteristics of the `buffer(_:)` algorithm.
  ///
  /// - Parameter createBuffer: A closure that constructs a new `AsyncBuffer` actor to store buffered values.
  /// - Returns: An asynchronous sequence that buffers elements using the specified `AsyncBuffer`.
  public func buffer<Buffer: AsyncBuffer>(_ createBuffer: @Sendable @escaping () -> Buffer) -> AsyncBufferSequence<Self, Buffer> where Buffer.Input == Element
  
  /// Creates an asynchronous sequence that buffers elements using a specific policy to limit the number of
  /// elements that are buffered.
  ///
  /// - Parameter policy: A limiting policy behavior on the buffering behavior of the `AsyncBufferSequence`
  /// - Returns: An asynchronous sequence that buffers elements up to a given limit.
  public func buffer(policy limit: AsyncLimitBuffer<Element>.Policy) -> AsyncBufferSequence<Self, AsyncLimitBuffer<Element>>
}

public struct AsyncBufferSequence<Base: AsyncSequence & Sendable, Buffer: AsyncBuffer>: Sendable where Base.Element == Buffer.Input { }

extension AsyncBufferSequence: AsyncSequence {
  public typealias Element = Buffer.Output
  
  /// The iterator for a `AsyncBufferSequence` instance.
  public struct Iterator: AsyncIteratorProtocol {
  	public mutating func next() async rethrows -> Element?
  }
  
  public func makeAsyncIterator() -> Iterator
}

Detailed Design

The AsyncBuffer type was specifically chosen to be an actor; that way no matter the access that may occur the stored elements of the buffer is isolated. The buffer can define exactly how the elements behave when pushed as well as how they behave when popped. Returning nil from the pop method means that the buffer does not have any available elements and must be called after additional elements are pushed into it. Any time at which the base of the AsyncBufferSequence returns nil, this means that if the buffer ever returns nil from the pop method that indicates the sequence is complete and returns nil downstream. If pop ever throws that indicates the sequence is in a terminal state and that failure is then rethrown in the iteration.

actor MoveBuffer: AsyncBuffer {
  var currentMove: Move?
  func push(_ move: Move) async {
    if let currentMove {
      currentMove = currentMove.add(move)
    } else {
      currentMove = move
    }
  }
  
  func pop() async -> Move? {
    defer { currentMove = nil }
    return currentMove
  }
}

Since AsyncBuffer types are actors, it is also conceivable that a buffer could be shared among many accessors. This particular pattern (albeit most likely an uncommon design) is legitimate to return a shared buffer from the construction closure; thusly sharing the buffer among all potential instances of sequences using it.

Unlike the general purpose AsyncBuffer the AsyncLimitBuffer cannot be directly constructed - it is instead constructed internally given the specified policy. The type being public means that the generic for the AsyncBufferSequence is exposed and part of the signature; indicating both the type of buffering that is occurring but also allowing for performance optimizations to occur.

This all means that given the following code:

let bufferedMoves = game.moves.buffer { MoveBuffer() }
for await move in bufferedMoves {
  await processMove(move)
}

This will allow for custom buffering of the elements with efficient storage and a simple interface to allow for that customization. This approach gives us the flexibility of custom buffers but the ease of use of a simple call site and retains the safety of how Swift's language level concurrency favors.

Notes on Sendable

Since all buffering means that the base asynchronous sequence must be iterated independently of the consumption (to resolve the production versus consumption issue) both the base AsyncSequence and the element of which need to be able to be sent across task boundaries (the iterator does not need this requirement). The AsyncBuffer itself needs to also be Sendable since it will be utilized in two distinct tasks; the production side of things iterating the base, and the consumption side of things iterating the AsyncBufferSequence itself. Thankfully in the case of the AsyncBuffer actor types are inherently Sendable.

Alternatives Considered

The buffer type was considered to be expected to be a sendable structure, however this meant that mutations of that structure would then be in an isolation that was internal to the buffer itself. We decided that this is perhaps an incorrect design pattern in that it exposes the user to potentials of deadlocks or at best isolation that escaped private implementations.

The buffer protocol definition could potentially throw on the push as well as the pop, and this is still an open point of consideration.

AsyncLimitBuffer could be externally constructible (e.g. the init(policy:) could be exposed as public). This is an open point of consideration, however the utility of which is dubious. With sufficient motivation, that could be exposed at a later point.

3 Likes

Hi @Philippe_Hausler, looks good. Glad to see the flexibility here!

One comment here: As async actor methods are re-entrant is there much benefit to restricting to actor types? If there is a suspension within the push/pop methods they'll need to be some kind of synchronisation anyway. I've actually found actors can be a bit slower than I'd like on occasion. A synchronised class/struct type can be quite a bit faster.

2 Likes

Or make it synchronous:

@rethrows
public protocol AsyncBuffer: Actor {
  associatedtype Input: Sendable
  associatedtype Output: Sendable
  func push(_ element: Input)
  func pop() throws -> Output?
}

But I think I'd prefer the flexibility myself.

EDIT: This wouldn't work actually, as you may want to capture a continuation and resume it later.

Two things I found useful a previous time I was designing this type:

  • a "let someone know I'm full/no longer full/empty/no longer empty" option (callback, secondary sequence of events, w/e)
  • a "buffer until signaled" option (my favorite thing to do with this was plug it into CADisplayLink so it emits one most-recent value per frame)
6 Likes

In general, I am +1 on having a buffer algorithm. I am still going back and forth on the Actor constraint. While it nudges the adopters to proper thread safety it feels overly protective as well. Especially when considering the performance implications that an actor brings with it.
Let’s consider a high throughput pipeline where we wanna have a buffer in the middle to smooth out a temporary slow consumer. The current proposal would incur two actors hops which are quite expensive. If we drop the actor requirement and instead add a Sendable requirement we would also get compiler warnings if users are not handling thread safety correctly here. We could even call out an actor approach in the docs.

3 Likes

To remove the extra overhead, the buffering functions would need to be made mutating. Which that would potentially work; however my initial drafts at that end up failing some of the tests. My guess is that we are relying on the extra hops to properly serialize things.

Hi

Regarding the actor discussion, couldn’t we just use ManagedCriticalState to wrap the buffering type provided by the developers and mention in the doc that we take care of concurrent access ?

About the provided « default » buffering mechanism (unbounded, newest, oldest), shouldn’t we prefer a version that suspends the producer when the buffer is full. It matches the philosophy of AsyncChannel in some ways. I have a draft branch that was more or less doing that for an hypothetical « AsyncBufferedChannel » and it worked well.

The issue with using a managed critical state is that you will need to call the push/pop in that critical region - leading to potential deadlocks due to external call-outs in the critical section.

Thinking about this some more. Having the methods be mutating and the type Sendable sounds right to me. The user would instantiate the type and buffer method should take it. Furthermore, the buffer algorithm can use a managed critical state itself to protect the calls to pop/push.
We have been doing something very similar in our NIO AsyncSequence.

2 Likes

I generally have nothing against an actor here, my only problem is that this is exactly a type that I would want to obey FIFO. When the system isn't fast enough to keep up with buffering I do not expect that the newest values to push can win before values that have already waited some time for their turn to access the island.

Also it might just be me and the general solution should be that the user guarantees that this type of buffer is pushed new values sequentially with FIFO being an implementation detail from the outside.


Also why is this a Buffer and not a Stack if we're using push/pop operations which are normally used with stack types?

Those names were chosen for the actions the buffering algorithm was performing; if a new value is iterated from the base that value is pushed onto the front of the storage, and when next is called it pops a value (if present) off to return. However, since we are rethinking the protocol definition of how to buffer then perhaps we should also consider better names for those methods.

@Philippe_Hausler do you have an opinion about the buffering algorithm we should provide as a "general use case" algorithm, that is to say: suspending push VS unbounded buffer or dropping values ?

I think the more advanced use case of a high-water system could be expressed as a custom buffering system; e.g. the push suspends when at the high-water mark. Being that NIO had a need for that I think it is justified to consider on its own merit and could be addressed as a follow-on design provided we can hash out the exact details of buffering in general.

2 Likes

In general, i also don’t have anything against an actor however if we just make the protocol inherit from Sendable and mark the methods async & mutating it is up to the adopter to choose an actor or not.

TBH I am unsure on how to actually enforce exclusive access to pushing a value onto the buffer versus popping a value off for next without at least one actor in the system; accessing the user defined type for defining the buffer from within a lock scope is a non starter in my book.

Here is a draft of the changes but sadly some unit tests fail now due to the change.

@Philippe_Hausler could you elaborate on the « why » we cannot use a managed critical state internally to mutate the buffer safely ?

Is it because the pop operation could be time consuming because of some transformation we want to apply to produce the « Output » ?

Firstly the function is async so the resume after might be on a different thread so that would potentially do something really unsafe with the lock.

Ignoring that issue; the problem is of deadlocks - any user call out with a lock held is dangerous. The foreign code might have a side effect that blocks the same task trying to drain it (specifically the main actor is a common case for that to occur).

No matter how we cut it locking is not ideal… but we need exclusive external access to the two methods. In the end it seems to me that has to mean actor.

Ok thanks for the insight.

Now, very naïve question from me: aren’t we trying to accomplish too many things with the current implementation of the buffer algorithm? By that I mean: buffering + some kind of mapping/reducing operation that would prevent us from having a simpler implementation? (I might not have the bigger picture here :wink:)

The mapping/reducing is managed by how the adopter uses it so we don't really need any extra bookkeeping in that regards. The only real complexity added is perhaps the generics but that is relatively trivial.

Perhaps we ought to look at from the perspective of a potential use case. Something that @twittemb and I were discussing is the value of having a back pressure supporting buffer. A buffer that allows a producer to run ahead of the consumer to a pre-determined limit. This kind of buffer would be used as a smoothing mechanism to increase throughput in a back pressure supporting pipeline.

channel.buffer { ThroughputBuffer(limit: 5) }

There's a few things that can be discussed here:

  1. If programmers choose to use a buffer that increases throughput, they're concerned about performance. Adding an actor as an intermediary here will very often necessitate an actor-hop and adversely impact that throughput. It also runs against the prevailing advice which is to batch operations which are to be run on a separate actor. By their very nature, asynchronous sequences are very 'non-batch like'. Actor-hopping is slow, especially when the actors are on different executors.
  2. Using an actor doesn't actually prevent deadlocks in and of itself. As we will be providing the 'stock' buffers, the custom implementations are likely to have a level of complexity to them in any case. Enforcing that implementors use an actor may in fact create a false sense of security as we already know how confusing people find actor re-entrancy. Quickly, people will be using Checked/UnsafeContinuation to suspend callers, to be resumed later. That requires a relatively good level of understanding from the implementor. So regardless of whether an actor is used or not, the docs will need to stress that a level of care is required to do this right. (For example, by ensuring they resume all their stashed continuations.)
  3. Related to point two, if people are stashing continuations, they'll need to be a cancel() method as otherwise the suspended Tasks won't resume. I'm not sure how needing to call cancel() from an async context may complicate things, but typically the practice has been to make calling cancel synchronous so this may need to be nonisolated.

In summary, I think that if we provide 1) a good base of stock implementations, and 2) a thorough explanation in the docs that this type requires careful implementation, there should be no need to lock people down who wish to implement their own. So I agree with @FranzBusch , this should be Sendable with a mutating pop/push – and an additional cancel.

1 Like