AsyncChannel: should we allow to buffer?

Hi,

In the current implementation of AsyncChannel, the sending operation works that way:

  • if no consumer -> the function suspends and resumes once a consumer requests a next element
  • if already a consumer, the function suspends and immediately resumes with the element

Would there be an interest in introducing a buffer (with a constrained size) that can make the suspension a bit more loose (Kotlin offers this kind of channel):

  • if no consumer and the buffer is not full -> the function suspends, buffers the element, and immediately resumes
  • if no consumer and the buffer is full -> the function suspends until there is a slot in the buffer then buffers the element and resumes
  • if already a consumer (the buffer is empty), the function suspends and immediately resumes with the element

The current implementation of the AsyncChannel would be a specific case where the buffer size is 0.

@Philippe_Hausler @FranzBusch do you see a use case for that ?

thanks.

8 Likes

For reference https://kotlinlang.org/docs/channels.html#buffered-channels

That sounds reasonable to me and is also the semantics that go's channels exhibit AFAIR.

Nit: In the "no consumer, buffer not full" case I don't think you'd suspend the caller at all.

I would also like a functionality like this, but how about setting the buffer size, say 0 or X, for example you could make something like current value subject in Combine with buffer size 1 if I am understanding you correctly?

The buffer would not be a "replay" buffer or a "current value" buffer. This was not my original intent. The intent was more to allow the AsyncChannel to be a bit more relaxed regarding suspension.

@Philippe_Hausler what do you think of that ?

1 Like

Thanks for explaining.

This makes sense to me, with the caveat that we should make it difficult (impossible?) to create an unbounded buffer size. In my experience those kinds of APIs are a common source of unexpected memory growth.

2 Likes

Agree. We have a rule here, ”no unbounded queues allowed”. But a small fixed buffer can make wonders in throughput.

1 Like

In general, I am pro having a buffered variant of this type. Buffers are great!

However, I am opposed to overloading the AsyncChannel to be both the buffered and unbuffered version. A prominent usage that I have seen for AsyncChannel right now is the guarantee that it suspends until the element has been consumed. If we allow to configure this via a buffer it becomes very hard to reason about if the call to send will suspend.
My personal preference here would be to create two separate types so that you can reason about what happens by just seeing the type name.

Lastly, the implementation of AsyncChannel is a bit more complicated and mixing in different behaviours depending on the chosen buffer would make that even more complicated.

Thanks @FranzBusch for the insight.

@Philippe_Hausler I do not have a prototype. I wanted to gather feedback first.

For now we can say:

  • there is an interest for that type of Channel
  • that would bring complexity to the implementation of AsyncChannel
  • it seems reasonable to introduce a dedicated type like AsyncBufferedChannel or so that would take a buffer size as an init param.

My concern with another type is that it will overlap with AsyncChannel when the buffer size is 0.

[EDIT] won’t overlap if we enforce a buffer size > 0

Do you concur ?

If this is a different type from AsyncChannel then it is independent from v1.

Your points are correct. I agree that if it is a different type then most assuredly it is a 1.1 thing, if it is just a "knob to adjust" in the initializer then that can be added post 1.0 if we don't think it is a requirement for folks to immediately think about (giving us flexibility to a quicker march to 1.0).

2 Likes

Honestly I think it's an optimization/refinement of AsyncChannel. It does not bring something that does not already exist with AsyncChannel (-> allowing 2 tasks to communicate).

It gives me (or someone else) the time to suggest an implementation along with a proposal for a future version.

2 Likes

For the record, here is a draft implementation -> [Channels] implement AsyncBufferedChannel by twittemb ¡ Pull Request #229 ¡ apple/swift-async-algorithms ¡ GitHub
@Philippe_Hausler

2 Likes

I wonder if this kind of buffer can be made into a separate algorithm as I think this kind of 'flex' will be a useful primitive/component in any pipeline where back-pressure is to be maintained. A kind-of 'backpressure buffer`.

So you could create the same effect via:

channel.backpressureBuffer(size: 5)

With the multicasting share sequence, it has utility, too. You may wish to allow some consumers to run ahead and maintain throughput – but only to a point – so you'd do:

channel.share().backpressureBuffer(size: 5)

Now, every concurrent consumer can independently trail production by up to five elements, but after that production is halted.

If it's built in to channel the flex would be forced to the production side, but actually it could be really useful on the consumption side!

Hi

Maybe those 2 things should exist in parallel?

AsyncBufferedChannel<Int>(bufferSize: 5) does not behave exactly the same than AsyncChannel<Int>().buffer(bufferSize: 5)

In the first case, a call to send(_:) (assuming the buffer is available) won’t create a continuation and will return as if it was a non async function. In the second one we inherit from the producer strategy, that is to say the send(_:) function will create a continuation that will be resumed immediately. I don’t know the impacts perform/runtime wise but there is a slight difference. Perhaps the AsyncChannel.send(_:) is not optimized because we could avoid creating a continuation if there is already an iteration waiting for an element.

Honestly I like the idea of providing an AsyncBufferedSequence that could be applied on any sequence as a ÂŤ smoothing Âť mechanism. If we say that AsyncChannel can be reworked to avoid creating unnecessary continuations I guess that would be the right path.

@Philippe_Hausler @FranzBusch what do you think ?

1 Like

I'd be interested to know if if there's a tangible impact in doing this. If so, that would mean it's probably always worth checking if execution can continue immediately before creating a continuation wherever possible – but that often involves checking some synchronized state which comes with a small hit itself.

But this post about UnsafeContinuation says that if a continuation is resumed synchronously within the supplied closure, no suspension takes place at all. Suspension only takes place if the continuation hasn't been resumed synchronously within the supplied closure. That suggests to me there would be no benefit to performing a prior check – and maybe even a performance hit doing the unnecessary work.

Hum interesting, it would indeed mean that we’d better go with composition of a base async sequence and an ‘AsyncBufferedSequence’.

Regarding ‘AsyncBufferedChannel’: ‘AsyncChannel’ + ‘AsyncBufferedSequence’.

@Philippe_Hausler can you confirm that ‘withUnsafeContinuation’ + resuming synchronously won’t suspend and can be used as if it was in the end a non suspending operation ? Thanks.

@tcldr I guess the buffer operator already exists -> Swift Async Algorithms: Buffer

We can already compose it with an AsyncChannel to obtain an AsyncBufferedChannel. Do you agree @Philippe_Hausler ?

Yes, just reading through the proposal now – I think it would work well.

Yes, I really really need this.

I would say the current AsyncChannel is just a specialized version of a buffered channel with count 1.

I find it kind of odd to say "do we need this", when there are languages out there where (Go) where concurrency is built entirely around channels. If there is ONE async algorithm that needs to exist, it is buffered channels.

What nobody needs is all the OTHER algorithms in here.

@twittemb I think you meant to say "buffer size 1", a channel with no buffer cannot exist - it always needs to hold at leat 1 element until it is consumed.