Swift Async Algorithms Proposal: Broadcast (Previously Shared)

Interesting, I haven't had a chance to take a proper look as I'm on my holidays at the moment :slight_smile: but certainly this is the kind of direction I'm leaning towards. A couple of things that I think would need to be worked out:

  • How we define an AsyncSubject type or equivalent. Personally, I like your direction but it does commit to having a generic failure parameter, so that's probably a bigger discussion and I'm not clear on when that will happen.
  • Whether we need the same machinery as the push based reactive libaries multicast implementations. For example, if you had a theoretical Subject that was a mix of AsyncChannel and the algorithm discussed above (i.e. back pressure compliant), I'm not sure you'd need actually need autoconnect anymore. And personally, I'd really like to see a refCount solution (i.e. a mechanism to discard the upstream iterator when the last consumer leaves) as this is a really useful way of transmitting the current usage status upstream. (I think I give some examples in the proposal above.)

Just catching up on this topic. Thanks @twittemb for laying out the two options of how a broadcast operator can work. Just to recap, we can implement either of these two:

  1. broadcast produces one value at a time and only once all downstream consumed it produces the next one
  2. broadcast keeps an individual buffer per downstream.

I have been thinking about this a bit and IMO we should go with the second option. The problem with the 1. is that a single slow consumer is impacting every other consumer. This is quite hard to debug and will lead to very weird behaviour across a system. The problem with 2. is that we are using a lot of memory and might get killed by the system. However, that is way easier to debug and to fix as well (one could use debounce or a similar algorithm).

Furthermore, 2. can be optimised space wise to only keep a single buffer (Deque) and an index per downstream.

Hi @FranzBusch, I have to say I’m not sure about this direction. As I tried to explain above, option 2 prevents this algorithm being used in a back pressure supporting pipeline, where option 1 does not. In addition option 1 can trivially be augmented with a follow-on/chained algorithm (a buffer) to behave like option 2.

In my opinion It would be unfortunate if we find ourselves with a two tier library with ‘back pressure compliant’ and ‘non back pressure compliant’ components.

I don't think that statement is true. The pipeline is still fully back-pressured. Consumers that do not want to receive new values aren't getting any but consumers that request more will get more.
Having a buffer in a pipeline doesn't mean the back-pressure is suddenly lost. Furthermore, most of the root AsyncSequences are already buffering themselves, e.g. AsyncStream or NIOAsyncSequenceProducer. So in the end we are just moving elements along the pipeline into the next buffer, this is a very normal thing in back pressured systems. For example in a networking stack bytes are moved from the Kernel buffers into user space buffers and further along.

By design, AsyncSequences are always back-pressured because consumers are voicing demand through next and producer can only yield values as a response to next. This is different than other reactive systems where producers are often just calling an onNext() method and shoving values down.
The only place where we really need to look at back-pressure is in root AsyncSequences since we need to transfer the back-pressure events signalled by next() to the underlying system that actually produces the values.

Overall, I think both algorithms are worth having but under slightly different names or maybe as an option to broadcast. Though I really think we need to start with the buffered version and make this the default.

So you’re saying that elements will only be consumed from the source stream as fast as the fastest consumer requests them. All other consumers will have a buffer of their late values?

Yes, that's the gist basically. The fastest consumers dictates the upstream consumption speed.

If no downstream consumer is issuing demand then nothing will be requested from the upstream.

This can be implemented with using continuation as latches to signal from downstream to upstream. It is quite similar to what we have done in the merge or debounce state machines.

Ok, I think this is a very important distinction to follow into the spec because this wasn’t clear to me. The buffer interpretation I understood was that it was filling the buffer as fast as the source would support. This is quite different.

I still think that this sounds very much like a composite of option 1 and a buffer, that would ultimately be more flexible as two algorithms that can support any use case. Especially if we provide convenience member functions for typical defaults.

I agree with you in general, that composition is better and we can 100% make this an internal composition.

I still think that the default broadcast() algorithm should probably buffer by default and provide an option to disable that, e.g. func broadcast(bufferStrategy: BufferStrategy = .unlimited) and you set it to .none if you want to.

Maybe I'm missing something (I've mentioned it somewhere here before) - but wouldn't a Disruptor pattern be a very reasonable option for this [edit: 'this' being 'Broadcast' in general]?

Fundamentally, back pressure is applied (since the ring buffer / dequeue is fixed size) if the producer would manager to 'catch up' the slowest consumer, but there's a single shared buffer and it has this 'batching effect' minimising synchronisation requirements and allowing slow consumers to catch up. It keeps memory footprint fundamentally constant.

I think that the design there provides a nice blend of efficiency and desirable functionality.

This may be possible by being creative with the member functions we define on AsyncSequence. I don't think it would be awful to have the member function construct the composite as a default:

extension AsyncSequence {
  fun broadcast(bufferStrategy: Strategy, ...)  -> AsyncBufferSequence<AsyncBroadcastSequence<Self>>
}

But then also allow manual construction via direct initialisation of AsyncBroadcastSequence via its initialiser for alternative strategies to deal with a slow consumer.

If I understand correctly you would achieve this via prefixing a broadcast sequence with a 'throughput sequence`.

source -> throughput buffer -> broadcast -> consumer

The throughput buffer would allow the source to run ahead n-elements of the consumers which would help to keep consumption saturated. It would be implemented just as your describe, with a ring buffer.

Right, but then you fundamentally end up running a) either the consumers in lockstep, or b) require individual buffering per consumer for the last items - the Disruptor removes that requirement with the shared dequeue and separate individual bookkeeping of 'read cursors'.

I'm quite interested in this data structure, but I'm not sure it applies well to Swift's Async Sequences. A big reason for this is handling cancellation, and also the way Swift's Tasks suspend and resume mean it's not quite a 1-1 mapping afaict, and atomics based algorithms aren't the easiest (understatement) to adapt.

However, I do have some ideas about using Swift Atomics at some point to create a data structure which I hope might be useful as the basis in the future for something in this area though.

1 Like

If I get this right, it means the upstream sequence would be iterated upfront ? like in the buffer algorithm ? which seems to not respect the principle of letting the downstream sequences drive the pace of elements production right ? (which is OK in the buffer algorithm since this is precisely the effect we want)

A 'throughput buffer' still allows the consumer to dictate the pace of production, it just allows the source to run ahead an extra n-elements. So yes, it would require an additional Task but it would still be very much back pressure compliant,

Imagine a source that produces an element every second, and then imagine a consumer that takes 0.1 seconds to consume most elements, but 3 secs to consume every fifth element. i.e (0.1, 0.1, 0.1, 0.1, 3.0). With a throughput buffer that 3 second delay gets 'absorbed' by the throughput buffer so that the source is still producing at its max rate. However, if there's an unexpectedly long delay, the source has its production halted. It's back pressure compliant.

EDIT: I'm assuming a throughput buffer with element count of 5 in the example.

That's true if you take the assumption that the producer is a "hot" producer but in the general case of any AsyncSequence, the call to next() on the upstream sequence triggers a side effect of some sort, and you probably don't want this side effect to be executed upfront. In the case of a buffer that would "linearise" the production of elements, the execution of side effects would not be related to the demand of the end client.

That surely depends on the use case. I think you'd only use a throughput buffer when you need one.

We talked about the composition earlier. After thinking about this a bit, I don't think composing broadcast that awaits for all consumers with a buffer is the same as broadcast that has built-in buffering. The former will basically drain the upstream producer as fast as possible since the buffers just soak up values. The latter will only drain the producer if any of the downstreams is actually consuming. Basically broadcast with built-in buffering has the advantage of knowing how fast the downstreams are. Furthermore, the built-in buffer allows quite a lot of memory saving since it can share one Deque instead of one Deque per downstream.

Yes, you're right. I think that's true. It would need to be an additional 'dial' in the algorithm to achieve that behaviour.

EDIT: That behaviour being, elements are consumed from the source at a rate of the fastest consumer – but no faster.

I wanted to add a little bit more here: It looks like we've identified quite a few different possible behaviours for what we'd want from a multicasting algorithm in terms of source element consumption. To list just some of those identified so far:

  • source consumption at the max rate of the source (no back pressure)
  • source consumption at the rate of the fastest consumer (back pressure)
  • source consumption at the rate of the slowest consumer (back pressure)

In addition, there's obvious behaviours like history and source iterator retention – which due to the tight level of orchestration required – need to managed in the state of the distributing algorithm to ensure cancellation and contiguous delivery of elements are handled correctly.

This is definitely now feeling like a 'family' of algorithms.

The other thing I wonder – and I would love to here the opinion of @FranzBusch and @Philippe_Hausler on this one – is that we still have a situation where our source algorithms (AsyncStream in the standard library) isn't suitable for multi-pass iteration.

It feels to me we are coming up against some of the same problems the designers of the original reactive programming libraries came up against. Can we solve two problems in one here?

Is it sensible for us to consider that actually, the idea of a Subject shaped type in Swift Async Algorithms and its associated multicast(_:) operator is actually a great way of ensuring 1) we are able to implement a range of different multicasting algorithms depending on the use case, and 2) when people reach for a 'source' algorithm, they have a type that is 'multicast by default' and reflects the behaviour that it seems many programmers already intuit.

I'm definitely open minded to alternative solutions, I just haven't seen one that solves these problems as elegantly, and it would seem a shame to exclude what might be an imperfect solution in the absence of something better.

1 Like