[Pitch] Async buffered channel

Hi everyone,

I know this topic has been more or less discussed several times here and there but I'd like to officially bring it to the table.

The current implementation of the project offers a way to create a channel that can be used as a communication bus between tasks, particularly when one task produces values and another task consumes them.

The back pressure management relies on the suspension/resume mechanism of the send(_:) / next() operations. The send operations will suspend as long as a consumer has not processed their element.
This has been preferred over a sync send(_:) operation to avoid having to store pending elements in a FIFO that could grow without control.

One thing to notice though is that AsyncChannels stack continuations in a buffer, waiting for a consumer to resume them.

The usage of such a channel is easy in a full async world where the send(_:) operation is expected to suspend.

In a sync world, we have to wrap the call to send(_:) in a Task to bridge sync and async worlds. One typical use case would be to use a channel as a way to transform user inputs into side effects. For instance in a unidirectional data flow architecture, a channel could be the entry point for user Events that would reduce a State into a new State, performing side effects along the way.

If we have a burst of user Events, and each event take some time to be processed then every send(_:) operations will be suspended within their own Task, and the internal stack of continuations will grow.

IMO the lack of a nice solution to bridge both worlds is currently an obstacle to the adoption of AsyncSequence, especially since the AsyncChannel back pressure management does not guarantee a non growth of the pending continuations stack.

I've worked on an implementation of an AsyncBufferedChannel (and its throwing counterpart). The only difference with AsyncChannel is that the send(_:) operation does not suspend and elements are stacked in a FIFO. AsyncBufferedChannel is also compatible with a multi producers / multi consumers paradigm. The elements are not shared and will be distributed across all the consumers.

The implementations:

@Philippe_Hausler I know you explained several times on this forum why you chose to implement AsyncChannel the way it is. What is your take on that ?

Thanks.

5 Likes

I haven't yet looked deeper into your code but we recently landed a similar shape NIOAsyncSequenceProducer.

The sequence we implemented in NIO is a bit more complex to hold but brings several benefits:

  • It exposes back-pressure to the producer
  • It allows you to yield/send a sequence of values to reduce lock contention
  • It uses a Deque for storing the buffered values which is more performant than using an Array

However, it also is slightly different in that it only allows a single Task to consume the values. The reason for this is that AsyncChannel and AsyncStream are not guaranteeing any ordering for multiple consumers and it is a first come first serve principle that we did not want to support.

Our implementation, also does a couple of things around cancellation such as cancelling once the AsyncIterator gets deinted.

On another note, the use-case where I found AsyncChannel to be the most useful right now is to create synchronisation points between two Tasks. Most often used in tests. I have not yet needed a buffered version of this type.

Edit:
On another note, the shape that you created is pretty damn close to what AsyncStream is doing. AsyncStream also allows you to yield elements without suspending and they get buffered up.

True – but I think it's valid to want a simpler API with a single AsyncBufferedChannel object with a non-suspending send(_:) method, instead of having to create an AsyncStream context, escape the continuation, and then send values through the continuation.

Some such as @stephencelis and @mbrandonw have tried to make working with AsyncStream a little nicer by creating a static function which returns both the stream and the continuation as a tuple – https://github.com/pointfreeco/swift-composable-architecture/blob/main/Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift#L174-L238. This feels more as a half-measure in the absence of something like AsyncBufferedChannel.

I fully agree that constructing the AsyncStream is not really convenient right now and the AsyncSequence that we created in NIO has also such a static factory method. This is easy enough to add to AsyncStream. Just needs to go through evolution.

More importantly tough, I think there is great value in separating the type that is passed to the consumer and the one passed to the producer. In the AsyncStream case this is the actual stream and the continuation. In our NIO case we named it Source instead of continuation.
The benefit of separating the types is that you can enforce different Sendability conformance’s and you can hook the deinits to inform the Continuation/Source once the consumer is done.

So I am in agreement with Franz; this is really close (if not an actual isomorphism) to AsyncStream, the only differential is that AsyncStream splits the interface into two types; the actual stream (which is the AsyncSequence) and the continuation; the thing that sends stuff.

Now I would agree that the ergonomics of creating AsyncStreams does not fit all scenarios and there is a common pattern that does crop up (which tbqh should likely be made easier).

extension AsyncStream {
  static func create(_ elementType: Element.Type = Element.self, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) -> (AsyncStream<Element>, AsyncStream<Element>.Continuation) {
    var continuation: AsyncStream<Element>.Continuation!
    let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 }
    return (stream, continuation)
  }
}

I have seen this pattern happen quite often (as was pointed out), and is perhaps something we should formalize to make things easier for folks. The implementation is quite trivial and could even be back deployed for usage with older versions of Swift without too much headache. I don't think that is a half measure at all; as a matter of fact I think it is a very reasonable move that we should strongly consider making a pitch and landing into the _Concurrency library.

7 Likes

Thanks for your reply @Philippe_Hausler

My main concern was that a not suited api would be a blocker for the adoption of AsyncSequence, furthermore if this is to replace Combine at some point.

So any solution that could improve the api is welcome and the solution you suggest looks a lot like the “pipe()” function from ReactiveSwift.

Before pitching the idea for the _Concurrency library, could you clarify @Philippe_Hausler the strategy though? On the one hand we have a “Channel” paradigm in this repo with a single entity for sending/iterating. On the other hand we would have another paradigm in the _Concurrency library with 2 entities for sending/iterating. I’m not saying it’s bad, I’m just concerned about the lack of homogeneity that could confuse people.

Personally I see “AsyncStream” as a declarative way of creating an AsyncSequence, in the same way Kotlin has “flow { emit(3) }”. Nevertheless there are still Channels and State/SharedFlow in Kotlin, which are suited for an imperative approach.

Thanks.

2 Likes

Should we make the returned tuple a struct as well?

Just one more note that we also wrote such an extractor to get the continuation - it must be a very, very common thing and definitely would merit inclusion as a standard solution.

Talking about pipe(), it is a common name for those who have a Unix background (which is basically most who are not windows based in reality) and the you have a read and a write end of the pipe - just as asyncstream.

As suggested here, there is now an ongoing pitch aiming to ease the instantiation of an AsyncStream -> [Pitch] Convenience Async[Throwing]Stream.makeStream methods - #30 by twittemb

As already mentioned here, IMO we will have 2 colliding instantiation/usage paradigms in the standard lib and this lib:

  • on the one hand, AsyncChannel is an all-in-one type that allows sending and iterating
  • on the other hand, AsyncStream (with this pitch) would separate the sending and iteration mechanisms into 2 distinct types.

IMO AsyncChannel and AsyncStream belong to the same family of « bridging » types (allow async communication in an imperative way), and it would make sense to provide a common pattern for their usage, whether it is an all-in-one type for both or 2 types for sending/iterating.

I think that before releasing the v1 of this repo, we have to make a strong statement about that (perhaps AsyncChannel should be 2 types) so that the future variants of Channel can follow the same path.

@Philippe_Hausler @FranzBusch what do you think ?

I wanted to add something important IMO to this discussion. In one of my projects I use AsyncStream as a bridge between a view and an async state machine. The stream is used as an entry point for user events.

I have implemented an extension on AsyncStream that allows to escape the continuation and use it in the view (just like mentioned in this thread and like @FranzBusch pitched on the forum).

While doing so I figured out that the stream cannot be consumed by 2 successive tasks (not concurrently) ... once the first task is cancelled, the next one will receive nil, meaning the stream is finished.

  let (input, output) = AsyncStream<Int>.pipe()

  input.yield(1)
  input.yield(2)
  input.yield(3)

  let task = Task {
    for await element in output {
      print(element) // will print 1, 2, 3
    }
  }

  // later in the application flow (once 1, 2 and 3 are consumed) the task is cancelled
  task.cancel()

  input.yield(4)
  input.yield(5)

  var iterator = output.makeAsyncIterator()
  let value4 = await iterator.next() // will return nil

I guess this behaviour is expected (looking at the source code, it seems so), and I just missed that until now but I think many developers will get confused by that also. I still think we need an equivalent to AsyncChannel that offers a non suspending send(_:) operation (using a buffer) and allows multi-consumption.

@Philippe_Hausler @FranzBusch Am I missing something ?

Isn't non-suspending send just an AsyncStream.Continuation?

Yep it is, but AsyncStream ends as soon as its consumer is cancelled ... which is the problem in my case.

AsyncStream is a unicast AsyncSequence for the most part even if it doesn’t strictly implement. I think what you want to achieve is an AsyncStream + broadcast(). This should give you the behavior you want

I guess it should since the cancellation will be handled at the broadcast level. There will be only one iterator for the AsyncStream.

Exactly, this sounds exactly what you want and is one of the big use cases behind broadcast()

Indeed.

The behaviour of AsyncChannel is kinda weird as it is a unicast sequence accepting multiple consumers though

AsyncChannel is not unicast. It implements a simulcast pattern. Something like multi producer multi consumer but using a FIFO strategy on the consumer side.

1 Like