In the current implementation of AsyncChannel, the sending operation works that way:
if no consumer -> the function suspends and resumes once a consumer requests a next element
if already a consumer, the function suspends and immediately resumes with the element
Would there be an interest in introducing a buffer (with a constrained size) that can make the suspension a bit more loose (Kotlin offers this kind of channel):
if no consumer and the buffer is not full -> the function suspends, buffers the element, and immediately resumes
if no consumer and the buffer is full -> the function suspends until there is a slot in the buffer then buffers the element and resumes
if already a consumer (the buffer is empty), the function suspends and immediately resumes with the element
The current implementation of the AsyncChannel would be a specific case where the buffer size is 0.
I would also like a functionality like this, but how about setting the buffer size, say 0 or X, for example you could make something like current value subject in Combine with buffer size 1 if I am understanding you correctly?
The buffer would not be a "replay" buffer or a "current value" buffer. This was not my original intent. The intent was more to allow the AsyncChannel to be a bit more relaxed regarding suspension.
This makes sense to me, with the caveat that we should make it difficult (impossible?) to create an unbounded buffer size. In my experience those kinds of APIs are a common source of unexpected memory growth.
In general, I am pro having a buffered variant of this type. Buffers are great!
However, I am opposed to overloading the AsyncChannel to be both the buffered and unbuffered version. A prominent usage that I have seen for AsyncChannel right now is the guarantee that it suspends until the element has been consumed. If we allow to configure this via a buffer it becomes very hard to reason about if the call to send will suspend.
My personal preference here would be to create two separate types so that you can reason about what happens by just seeing the type name.
Lastly, the implementation of AsyncChannel is a bit more complicated and mixing in different behaviours depending on the chosen buffer would make that even more complicated.
Your points are correct. I agree that if it is a different type then most assuredly it is a 1.1 thing, if it is just a "knob to adjust" in the initializer then that can be added post 1.0 if we don't think it is a requirement for folks to immediately think about (giving us flexibility to a quicker march to 1.0).
Honestly I think it's an optimization/refinement of AsyncChannel. It does not bring something that does not already exist with AsyncChannel (-> allowing 2 tasks to communicate).
It gives me (or someone else) the time to suggest an implementation along with a proposal for a future version.
I wonder if this kind of buffer can be made into a separate algorithm as I think this kind of 'flex' will be a useful primitive/component in any pipeline where back-pressure is to be maintained. A kind-of 'backpressure buffer`.
So you could create the same effect via:
channel.backpressureBuffer(size: 5)
With the multicasting share sequence, it has utility, too. You may wish to allow some consumers to run ahead and maintain throughput â but only to a point â so you'd do:
channel.share().backpressureBuffer(size: 5)
Now, every concurrent consumer can independently trail production by up to five elements, but after that production is halted.
If it's built in to channel the flex would be forced to the production side, but actually it could be really useful on the consumption side!
AsyncBufferedChannel<Int>(bufferSize: 5) does not behave exactly the same than AsyncChannel<Int>().buffer(bufferSize: 5)
In the first case, a call to send(_:) (assuming the buffer is available) wonât create a continuation and will return as if it was a non async function. In the second one we inherit from the producer strategy, that is to say the send(_:) function will create a continuation that will be resumed immediately. I donât know the impacts perform/runtime wise but there is a slight difference. Perhaps the AsyncChannel.send(_:) is not optimized because we could avoid creating a continuation if there is already an iteration waiting for an element.
Honestly I like the idea of providing an AsyncBufferedSequence that could be applied on any sequence as a ÂŤ smoothing Âť mechanism. If we say that AsyncChannel can be reworked to avoid creating unnecessary continuations I guess that would be the right path.
I'd be interested to know if if there's a tangible impact in doing this. If so, that would mean it's probably always worth checking if execution can continue immediately before creating a continuation wherever possible â but that often involves checking some synchronized state which comes with a small hit itself.
But this post about UnsafeContinuation says that if a continuation is resumed synchronously within the supplied closure, no suspension takes place at all. Suspension only takes place if the continuation hasn't been resumed synchronously within the supplied closure. That suggests to me there would be no benefit to performing a prior check â and maybe even a performance hit doing the unnecessary work.
@Philippe_Hausler can you confirm that âwithUnsafeContinuationâ + resuming synchronously wonât suspend and can be used as if it was in the end a non suspending operation ? Thanks.
I would say the current AsyncChannel is just a specialized version of a buffered channel with count 1.
I find it kind of odd to say "do we need this", when there are languages out there where (Go) where concurrency is built entirely around channels. If there is ONE async algorithm that needs to exist, it is buffered channels.
What nobody needs is all the OTHER algorithms in here.
@twittemb I think you meant to say "buffer size 1", a channel with no buffer cannot exist - it always needs to hold at leat 1 element until it is consumed.