[Pitch] Async buffered channel

Hi everyone,

I know this topic has been more or less discussed several times here and there but I'd like to officially bring it to the table.

The current implementation of the project offers a way to create a channel that can be used as a communication bus between tasks, particularly when one task produces values and another task consumes them.

The back pressure management relies on the suspension/resume mechanism of the send(_:) / next() operations. The send operations will suspend as long as a consumer has not processed their element.
This has been preferred over a sync send(_:) operation to avoid having to store pending elements in a FIFO that could grow without control.

One thing to notice though is that AsyncChannels stack continuations in a buffer, waiting for a consumer to resume them.

The usage of such a channel is easy in a full async world where the send(_:) operation is expected to suspend.

In a sync world, we have to wrap the call to send(_:) in a Task to bridge sync and async worlds. One typical use case would be to use a channel as a way to transform user inputs into side effects. For instance in a unidirectional data flow architecture, a channel could be the entry point for user Events that would reduce a State into a new State, performing side effects along the way.

If we have a burst of user Events, and each event take some time to be processed then every send(_:) operations will be suspended within their own Task, and the internal stack of continuations will grow.

IMO the lack of a nice solution to bridge both worlds is currently an obstacle to the adoption of AsyncSequence, especially since the AsyncChannel back pressure management does not guarantee a non growth of the pending continuations stack.

I've worked on an implementation of an AsyncBufferedChannel (and its throwing counterpart). The only difference with AsyncChannel is that the send(_:) operation does not suspend and elements are stacked in a FIFO. AsyncBufferedChannel is also compatible with a multi producers / multi consumers paradigm. The elements are not shared and will be distributed across all the consumers.

The implementations:

@Philippe_Hausler I know you explained several times on this forum why you chose to implement AsyncChannel the way it is. What is your take on that ?

Thanks.

3 Likes

I haven't yet looked deeper into your code but we recently landed a similar shape NIOAsyncSequenceProducer.

The sequence we implemented in NIO is a bit more complex to hold but brings several benefits:

  • It exposes back-pressure to the producer
  • It allows you to yield/send a sequence of values to reduce lock contention
  • It uses a Deque for storing the buffered values which is more performant than using an Array

However, it also is slightly different in that it only allows a single Task to consume the values. The reason for this is that AsyncChannel and AsyncStream are not guaranteeing any ordering for multiple consumers and it is a first come first serve principle that we did not want to support.

Our implementation, also does a couple of things around cancellation such as cancelling once the AsyncIterator gets deinted.

On another note, the use-case where I found AsyncChannel to be the most useful right now is to create synchronisation points between two Tasks. Most often used in tests. I have not yet needed a buffered version of this type.

Edit:
On another note, the shape that you created is pretty damn close to what AsyncStream is doing. AsyncStream also allows you to yield elements without suspending and they get buffered up.

True – but I think it's valid to want a simpler API with a single AsyncBufferedChannel object with a non-suspending send(_:) method, instead of having to create an AsyncStream context, escape the continuation, and then send values through the continuation.

Some such as @stephencelis and @mbrandonw have tried to make working with AsyncStream a little nicer by creating a static function which returns both the stream and the continuation as a tuple – swift-composable-architecture/ConcurrencySupport.swift at main · pointfreeco/swift-composable-architecture · GitHub. This feels more as a half-measure in the absence of something like AsyncBufferedChannel.

I fully agree that constructing the AsyncStream is not really convenient right now and the AsyncSequence that we created in NIO has also such a static factory method. This is easy enough to add to AsyncStream. Just needs to go through evolution.

More importantly tough, I think there is great value in separating the type that is passed to the consumer and the one passed to the producer. In the AsyncStream case this is the actual stream and the continuation. In our NIO case we named it Source instead of continuation.
The benefit of separating the types is that you can enforce different Sendability conformance’s and you can hook the deinits to inform the Continuation/Source once the consumer is done.

So I am in agreement with Franz; this is really close (if not an actual isomorphism) to AsyncStream, the only differential is that AsyncStream splits the interface into two types; the actual stream (which is the AsyncSequence) and the continuation; the thing that sends stuff.

Now I would agree that the ergonomics of creating AsyncStreams does not fit all scenarios and there is a common pattern that does crop up (which tbqh should likely be made easier).

extension AsyncStream {
  static func create(_ elementType: Element.Type = Element.self, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) -> (AsyncStream<Element>, AsyncStream<Element>.Continuation) {
    var continuation: AsyncStream<Element>.Continuation!
    let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 }
    return (stream, continuation)
  }
}

I have seen this pattern happen quite often (as was pointed out), and is perhaps something we should formalize to make things easier for folks. The implementation is quite trivial and could even be back deployed for usage with older versions of Swift without too much headache. I don't think that is a half measure at all; as a matter of fact I think it is a very reasonable move that we should strongly consider making a pitch and landing into the _Concurrency library.

3 Likes

Thanks for your reply @Philippe_Hausler

My main concern was that a not suited api would be a blocker for the adoption of AsyncSequence, furthermore if this is to replace Combine at some point.

So any solution that could improve the api is welcome and the solution you suggest looks a lot like the “pipe()” function from ReactiveSwift.

Before pitching the idea for the _Concurrency library, could you clarify @Philippe_Hausler the strategy though? On the one hand we have a “Channel” paradigm in this repo with a single entity for sending/iterating. On the other hand we would have another paradigm in the _Concurrency library with 2 entities for sending/iterating. I’m not saying it’s bad, I’m just concerned about the lack of homogeneity that could confuse people.

Personally I see “AsyncStream” as a declarative way of creating an AsyncSequence, in the same way Kotlin has “flow { emit(3) }”. Nevertheless there are still Channels and State/SharedFlow in Kotlin, which are suited for an imperative approach.

Thanks.

2 Likes

Should we make the returned tuple a struct as well?

Just one more note that we also wrote such an extractor to get the continuation - it must be a very, very common thing and definitely would merit inclusion as a standard solution.

Talking about pipe(), it is a common name for those who have a Unix background (which is basically most who are not windows based in reality) and the you have a read and a write end of the pipe - just as asyncstream.