I wonder if this kind of buffer can be made into a separate algorithm as I think this kind of 'flex' will be a useful primitive/component in any pipeline where back-pressure is to be maintained. A kind-of 'backpressure buffer`.
So you could create the same effect via:
channel.backpressureBuffer(size: 5)
With the multicasting share
sequence, it has utility, too. You may wish to allow some consumers to run ahead and maintain throughput – but only to a point – so you'd do:
channel.share().backpressureBuffer(size: 5)
Now, every concurrent consumer can independently trail production by up to five elements, but after that production is halted.
If it's built in to channel
the flex would be forced to the production side, but actually it could be really useful on the consumption side!