Swift Async Algorithms Proposal: Broadcast (Previously Shared)

TL;DR: I’ve implemented a multicasting algorithm that supports back-pressure and – either directly or indirectly – supports all the use cases mentioned in the multicasting thread.

After the previous discussion in the multicast(_:) thread around various use-cases for a multicasting algorithm, I decided to attempt one myself. It was a fun exercise! If anything, It crystallised for me the role that back-pressure plays within asynchronous sequence pipelines and the opportunities it gives rise to.

So with that in mind, I’ve put together a proposal for a shared asynchronous sequence that embraces the natural back-pressure characteristics of Swift’s asynchronous sequences, while still being adaptable to use-cases where back-pressure needs to be release through the use of follow-on sequences.

I hope it’s useful. It would be great to hear your feedback.

Broadcast

Introduction

AsyncBroadcastSequence unlocks additional use cases for structured concurrency and asynchronous sequences by allowing almost any asynchronous sequence to be adapted for consumption by multiple concurrent consumers.

Motivation

The need often arises to distribute the values of an asynchronous sequence to multiple consumers. Intuitively, it seems that a sequence should be iterable by more than a single consumer, but many types of asynchronous sequence are restricted to supporting only one consumer at a time.

One example of an asynchronous sequence that would naturally fit this 'one to many' shape is the output of a hardware sensor. A hypothetical hardware sensor might include the following API:

public final class Accelerometer {
  
  public struct Event { /* ... */ }
  
  // exposed as a singleton to represent the single on-device sensor
  public static let shared = Accelerometer()
  
  private init() {}
  
  public var updateHandler: ((Event) -> Void)?
  
  public func startAccelerometer() { /* ... */ }
  public func stopAccelerometer() { /* ... */ }
}

To share the sensor data with a consumer through an asynchronous sequence you might choose an AsyncStream:

final class OrientationMonitor { /* ... */ }
extension OrientationMonitor {
  
  static var orientation: AsyncStream<Accelerometer.Event> {
    AsyncStream { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      continuation.onTermination = { @Sendable _ in
        Accelerometer.shared.stopAccelerometer()
      }
      Accelerometer.shared.startAccelerometer()
    }
  }
}

With a single consumer, this pattern works as expected:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 1: Orientation: (0.0, 0.8, 0.0)
// Consumer 1: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// ...

However, as soon as a second consumer comes along, data for the first consumer stops. This is because the singleton Accelerometer.shared.updateHandler is updated within the closure for the creation of the second AsyncStream. This has the effect of redirecting all Accelerometer data to the second stream.

One attempted workaround might be to vend a single AsyncStream to all consumers:

extension OrientationMonitor {
  
  static let orientation: AsyncStream<Accelerometer.Event> = {
    AsyncStream { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      continuation.onTermination = { @Sendable _ in
        Accelerometer.shared.stopAccelerometer()
      }
      Accelerometer.shared.startAccelerometer()
    }
  }()
}

This comes with another issue though: when two consumers materialise, the output of the stream becomes split between them:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
let consumer2 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 2: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 2: Orientation: (0.0, 0.8, 0.0)
// Consumer 2: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// ...

Rather than consumers receiving all values emitted by the AsyncStream, they receive only a subset. In addition, if the task of a consumer is cancelled, via consumer2.cancel() for example, the onTermination trigger of the AsyncSteam.Continuation executes and stops Accelerometer data being generated for both tasks.

Proposed solution

AsyncBroadcastSequence provides a way to multicast a single upstream asynchronous sequence to any number of consumers.

extension OrientationMonitor {
  
  static let orientation: AsyncBroadcastSequence<AsyncStream<Accelerometer.Event>> = {
    let stream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      Accelerometer.shared.startAccelerometer()
    }
    return stream.broadcast(disposingBaseIterator: .whenTerminated)
  }()
}

Now, each consumer receives every element output by the source stream:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
let consumer2 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 2: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 2: Orientation: (0.0, 1.0, 0.0)
// Consumer 1: Orientation: (0.0, 0.8, 0.0)
// Consumer 2: Orientation: (0.0, 0.8, 0.0)
// Consumer 1: Orientation: (0.0, 0.6, 0.0)
// Consumer 2: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// Consumer 2: Orientation: (0.0, 0.4, 0.0)
// ...

This does leave our accelerometer running even when the last consumer has cancelled though. While this makes sense for some use-cases, it would be better if we could automate shutdown of the accelerometer when there's no longer any demand, and start it up again when demand returns. With the help of the deferred algorithm, we can:

extension OrientationMonitor {
  
  static let orientation: AsyncBroadcastSequence<AsyncDeferredSequence<AsyncStream<Accelerometer.Event>>> = {
    let stream = deferred {
      AsyncStream { continuation in
        Accelerometer.shared.updateHandler = { event in
          continuation.yield(event)
        }
        continuation.onTermination = { @Sendable _ in
          Accelerometer.shared.stopAccelerometer()
        }
        Accelerometer.shared.startAccelerometer()
      }
    }
    // `.whenTerminatedOrVacant` is the default, so we could equally write `.broadcast()`
    // but it's included here for clarity.
    return stream.broadcast(disposingBaseIterator: .whenTerminatedOrVacant)
  }()
}

With .whenTerminatedOrVacant set as the iterator disposal policy (the default), when the last downstream consumer cancels the upstream iterator is dropped. This triggers AsyncStream's onTermination handler, shutting off the Accelerometer.

Now, with AsyncStream composed with AsyncDeferredSequence, any new demand triggers the re-execution of AsyncDeferredSequence's' closure, the restart of the Accelerometer, and a new sequence for AsyncBroadcastSequence to share.

Configuration Options

AsyncBroadcastSequence provides two conveniences to adapt the sequence for the most common multicast use-cases:

  1. As described above, a configurable iterator disposal policy that determines whether the shared upstream iterator is disposed of when the consumer count count falls to zero.
  2. A history feature that allows late-coming consumers to receive the most recently emitted elements prior to their arrival. One use-case could be a UI that is updated by an infrequently emitting sequence. Rather than wait for the sequence to emit a new element to populate an interface, the last emitted value can be used until such time that fresh data is emitted.

Detailed design

Algorithm Summary:

The idea behind the AsyncBroadcastSequence algorithm is as follows: Vended iterators of AsyncBroadcastSequence are known as 'runners'. Runners compete in a race to grab the next element from a base iterator for each of its iteration cycles. The 'winner' of an iteration cycle returns the element to the shared context which then supplies the result to later finishers. Once every runner has finished, the current cycle completes and the next iteration can start. This means that runners move forward in lock-step, only proceeding when the the last runner in the current iteration has received a value or has cancelled.

AsyncBroadcastSequence Iterator Lifecycle:

  1. Connection: On connection, each 'runner' is issued with an ID (and any prefixed values from the history buffer) by the shared context. From this point on, the algorithm will wait on this iterator to consume its values before moving on. This means that until next() is called on this iterator, all the other iterators will be held until such time that it is, or the iterator's task is cancelled.

  2. Run: After its prefix values have been exhausted, each time next() is called on the iterator, the iterator attempts to start a 'run' by calling startRun(_:) on the shared context. The shared context marks the iterator as 'running' and issues a role to determine the iterator's action for the current iteration cycle. The roles are as follows:
    - FETCH: The iterator is the 'winner' of this iteration cycle. It is issued with the shared base iterator, calls next() on it, and once it resumes returns the value to the shared context.
    - WAIT: The iterator hasn't won this cycle, but was fast enough that the winner has yet to resume with the element from the base iterator. Therefore, it is told to suspend (WAIT) until such time that the winner resumes.
    - YIELD: The iterator is late (and is holding up the other iterators). The shared context issues it with the value retrieved by the winning iterator and lets it continue immediately.
    - HOLD: The iterator is early for the next iteration cycle. So it is put in the holding pen until the next cycle can start. This is because there are other iterators that still haven't finished their run for the current iteration cycle. This iterator will be resumed when all other iterators have completed their run.

  3. Completion: The iterator calls cancel on the shared context which ensures the iterator does not take part in the next iteration cycle. However, if it is currently suspended it may not resume until the current iteration cycle concludes. This is especially important if it is filling the key FETCH role for the current iteration cycle.

AsyncBroadcastSequence

Declaration

public struct AsyncBroadcastSequence<Base: AsyncSequence> where Base: Sendable, Base.Element: Sendable

Overview

An asynchronous sequence that can be iterated by multiple concurrent consumers.

Use an asynchronous broadcast sequence when you have multiple downstream asynchronous sequences with which you wish to share the output of a single asynchronous sequence. This can be useful if you have expensive upstream operations, or if your asynchronous sequence represents the output of a physical device.

Elements are emitted from an asynchronous broadcast sequence at a rate that does not exceed the consumption of its slowest consumer. If this kind of back-pressure isn't desirable for your use-case, AsyncBroadcastSequence can be composed with buffers – either upstream, downstream, or both – to acheive the desired behavior.

If you have an asynchronous sequence that consumes expensive system resources, it is possible to configure AsyncBroadcastSequence to discard its upstream iterator when the connected downstream consumer count falls to zero. This allows any cancellation tasks configured on the upstream asynchronous sequence to be initiated and for expensive resources to be terminated. AsyncBroadcastSequence will re-create a fresh iterator if there is further demand.

For use-cases where it is important for consumers to have a record of elements emitted prior to their connection, a AsyncBroadcastSequence can also be configured to prefix its output with the most recently emitted elements. If AsyncBroadcastSequence is configured to drop its iterator when the connected consumer count falls to zero, its history will be discarded at the same time.

Creating a sequence

init(
  _ base: Base,
  history historyCount: Int = 0,
  disposingBaseIterator iteratorDisposalPolicy: IteratorDisposalPolicy = .whenTerminatedOrVacant
)

Contructs an asynchronous broadcast sequence.

  • history: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer
  • iteratorDisposalPolicy: the iterator disposal policy applied to the upstream iterator

AsyncBroadcastSequence.IteratorDisposalPolicy

Declaration

public enum IteratorDisposalPolicy: Sendable {
  case whenTerminated
  case whenTerminatedOrVacant
}

Overview

The iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator

  • whenTerminated: retains the upstream iterator for use by future consumers until the base asynchronous sequence is terminated
  • whenTerminatedOrVacant: discards the upstream iterator when the number of consumers falls to zero or the base asynchronous sequence is terminated

broadcast(history:disposingBaseIterator)

Declaration

extension AsyncSequence {

  public func broadcast(
    history historyCount: Int = 0,
    disposingBaseIterator iteratorDisposalPolicy: AsyncBroadcastSequence<Self>.IteratorDisposalPolicy = .whenTerminatedOrVacant
  ) -> AsyncBroadcastSequence<Self>
}

Overview

Creates an asynchronous sequence that can be shared by multiple consumers.

  • history: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer
  • iteratorDisposalPolicy: the iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator

Comparison with other libraries

  • ReactiveX ReactiveX has the Publish observable which when can be composed with the Connect, RefCount and Replay operators to support various multi-casting use-cases. The discardsBaseIterator behavior is applied via RefCount (or the .share().refCount() chain of operators in RxSwift), while the history behavior is achieved through Replay (or the .share(replay:) convenience in RxSwift)

  • Combine Combine has the multicast(_:) operator, which along with the functionality of ConnectablePublisher and associated conveniences supports many of the same use cases as the ReactiveX equivalent, but in some instances requires third-party ooperators to achieve the same level of functionality.

Due to the way a Swift AsyncSequence, and therefore AsyncBroadcastSequence, naturally applies back-pressure, the characteristics of an AsyncBroadcastSequence are different enough that a one-to-one API mapping of other reactive programmming libraries isn't applicable.

However, with the available configuration options – and through composition with other asynchronous sequences – AsyncBroadcastSequence can trivially be adapted to support many of the same use-cases, including that of Connect, RefCount, and Replay.

Effect on API resilience

TBD

Alternatives considered

Creating a one-to-one multicast analog that matches that of existing reactive programming libraries. However, it would mean fighting the back-pressure characteristics of AsyncSequence. Instead, this implementation embraces back-pressure to yield a more flexible result.

Acknowledgments

Thanks to Philippe Hausler and Franz Busch, as well as all other contributors on the Swift forums, for their thoughts and feedback.

8 Likes

This is a great start! There are some interesting tidbits here that might be good to expand upon because this particular item is a sticky and complex concept. It might be really good for us to get a handle of the behavior with some examples in the proposal of its usage. Particularly I think it would be good for folks looking back on this to understand when you need to use .share and when you don't want to use it.

I have a few quibbles with naming but I think we should focus on the functionality of things first. Do the "knobs" of history and disposingBaseIterator encompass all behavior? (I think they are missing the concepts of how values are distributed between iterations. Particularly this part:

Would it not be reasonable to have a load-balancing scheme where runners compete and then that runner gets the value that is to be consumed and the other runners await the next value to "race" upon? If that is a reasonable approach as a set of options then we either have missing knobs -or- we have a family of algorithms. Personally I feel that it is likely the latter to some extent. If we hold that to be true then a sibling algorithm of "load balance" might also utilize similar configuration schemes (such as the behavior about cancellation/iterator disposal).

Given the current behavior proposed; if a runner is fast and requests a value but another runner is REALLY slow then won't this starve out that faster consumer? E.g.

let shared = someBase.share()
await withTaskGroup(Void.self) { group in
  group.addTask {
    for await item in shared {
      doFastThing(item)
    }
  }
  group.addTask {
    for await item in shared {
      try? await Task.sleep(for: .seconds(3))
      doSlowThing(item)
    }
  }
}

By the current proposal the doFastThing will only be called every 3 seconds at most because it must await the holding state of the other runners right?

1 Like

Absolutely, I’ll expand on some of the use cases I mentioned in the other thread and bring them into the ‘motivation’ part of the proposal in some way. Some use cases really depend on ‘accessory’ algorithms to get the full range of possibility, but I’ll try and keep examples within the constraint of what’s available now.

It probably doesn’t encompass all the behaviour we’d like, but via composition with the accessory algorithms mentioned above it would probably cover most use-cases available in the default set of Rx subjects/connectables. (Connect, RefCount, Replay, etc.)

Yeah, that’s an interesting idea! At the moment the algorithm ensures each emission from the upstream iterator is distributed to every downstream consumer, and it occurs in lockstep. You could definitely have a variation where an emission is only distributed to a single downstream consumer – whoever gets there fastest – but, yes, it’s probably different enough to warrant its own algorithm.

One option that probably could be easily incorporated is to allow the fastest runners to continue ahead of the slower runners for some number of iterations. Currently, it’s fixed to one, so the runners run in lock-step. But it can relatively easily be modified to be a configurable number. This creates some flex in the algorithm by allowing faster consumers to move forward at a faster pace than the slower ones.

Couple it with some kind of ‘skip n elements’ algorithm on the consumer end, and you can distribute a load across tasks concurrently. (It would require you to know the number of tasks you wish to distribute across upfront though, so if the use case is better served by the load-balancing algorithm above, it may not be worth complicating it.)

Correct, that would starve the consumer. But you can mitigate that by composing with a buffer on the consumption side to match the use case. It seems in these situations there is a compromise either way, either a) all tasks are held-up by the slowest consumer, or b) a buffer is filled with queued items. But by keeping the buffer separate from the algorithm you’re able to choose exactly which compromise you’d like to make and how.

Generally, my feeling is that an algorithm should have back-pressure by default if it makes sense, and then it’s a case of choosing when and where to release it when assembling the sequence pipeline through placement of buffers.

Disruptor in practice?

https://lmax-exchange.github.io/disruptor/

(But with tasks instead of threads)

1 Like

Interesting, will check it out. Will no doubt be some interesting use cases for it.

1 Like

I've updated the proposal with additional examples.

I'm also wondering if it may make sense to break out the history part as a separate algorithm. By default, share() is disconnected, it's only when a downstream consumer creates demand and drives the algorithm forward that its history is generated. I can see people doing things like this to workaround it:

pullTask = Task {
  for await _ in sharedSequence { }
}

This would 'prime' the shared sequence so that its history is always filled with the latest elements from the base sequence, creating a kind of 'live', or hot, shared sequence.

This is likely a perfectly valid use-case, but it should probably be formalised into a follow-on algorithm. This follow-on algorithm might then be a better home for history, as history might not make much sense on a 'cold' sequence, but quite a bit of sense on a 'hot' sequence.

I think the concept of history is definitely worth its own algorithm.

There is an interesting point here too on naming; share may have other examples in similar systems but I think there might be better names to house these variants; it might be good to enumerate exactly the ideal names for all of the family members. For example - one name that might be interesting to use (which I have made some prototypes of) is named broadcast, this immediately infers the concept of "everyone gets the same values at the same time", which might help in the concept of awaiting for the others to bee ready. Likewise as I have already mentioned loadBalance might be a good name for the split up values in some sort of strategy.

I would be interested to see what other folks have as for ideas of names in this family.

2 Likes

With regards to extricating history from the algorithm... On investigating further, I'm not sure if there's a good way to do it. We need some way of grabbing the history at the point of connection so it's guaranteed contiguous. It's relatively easy within the synchronised shared context of the algorithm, but cross sequence it's a lot tougher.

History doesn't add much complexity to the algorithm as is, so it may make sense to leave it how it is, and then choose how we expose it in the public facing API. Only exposing it where we feel it makes sense.

I'm not particularly wedded to the name share, and I'm definitely plus one on broadcast as a name for one of the variants. I think it communicates the utility very well.

Whether it's used for this algorithm as is, or the Task augmented variety (to pull down the latest values and keep history topped up even when there's no other consumers), I'm not sure. Maybe relay(history:). could be a good name for that functionality? I can see that one being useful in UI contexts.

I also think loadBalance makes a let of sense, but perhaps parallelize, split or distribute could work, too.

Some name options: broadcast, share, multicast, publish, distribute, relay, manifold, toMany, fanOut

I also wonder if the multicast concept as a whole might be broad enough that there isn't necessarily a one-to-one mapping with the member function name and the algorithm name. It's complex enough that for some common use-cases, composing a short chain of algorithms might make sense, and then providing a member function that does the assembly of the chain as a convenience.

If so, broadcast might end up being a good home for this kind of assembly.

Going back to this, one further configuration option I've realised may be useful is how the upstream iterator is wrapped. Currently, it's wrapped in a plain old actor, and works very well and seems to be performant, but if you're mainly working on the @MainActor it seems a shame to actor hop between the main thread and cooperative thread-pool needlessly. We might be able to make that configurable in some way.

I've updated the proposal and PR to reflect the broadcast name as it does seem to better reflect the utility. If anyone has any other preferences do chime in!

Also, the more I think about how the algorithm might be used, the less I believe there is for specialist 'follow-on' algorithms as pretty much everything required to adapt it to the use cases I can imagine is achievable via prefix/follow-on placement of standard buffers/regular algorithms. (So, no need to have a parallel 'driving' Task if you want to keep history topped up, just use an upstream buffer with the desired history, and configure broadcast with the same history if required.)

Regarding history: I did play around with taking the context/state machine out of the algorithm. The key is that, to provide contiguous history, you need hooks that can tap into the synchronized state of the algorithm. In the end though, the effect on performance and additional complexity made me abandon the idea. In terms of performance, removing the history mechanism entirely resulted in a 1.6% speed-up vs using a history with its count set to zero – but I feel even that could be eliminated with a little extra work.

Thanks for this, it will be great to have sharable sequences. But I wonder, is there a high-level explanation somewhere for why AsyncSequence can't simply always be broadcast? Is that for technical or UX reasons?

It really depends on the asynchronous sequence. Some asynchronous sequences don't have any dependencies and are naturally multicast.

For example, say you have a sequence that emits an incremented number every second, starting at 0 and going up. If you want all your consumers to start from zero – that's fine – use the sequence as is, no issue.

However, if you want your consumers to get the latest number (i.e. a second consumer arrives thirty seconds later so should start with element 30) then you need someway of sharing the original sequence. That's where broadcast would come in.

Once something like broadcast is available, you'll then be able to trivially create the equivalent of a PassthroughSubject by, for example, chaining an AsyncStream and an AsyncBroadcastSequence together.

EDIT: The other thing I should add is that once AsyncBroadcastSequence (or its equivalent) and primary associated types for asynchronous sequences are available, it will give API designers the option of making their sequences 'broadcast by default' anyway – so as a user of an API – you won't need to think about it too much. It'll just work.

Thank you for your reply.

Well, that is sort of my question, why do you need that? Why isn't it automatic? Is it because we want different sharing behaviour in different cases, so it's safer to default to no sharing at all? It seems to me that sharing and replaying are at least partly separate questions.

For example, in my use cases I basically always want every consumer, no matter when they start subscribing, to all receive just the values emitted after the subscribed. In a few cases I want a current value subject kind of behaviour, but again for everyone. I personally never really want to replay all the values that were emitted before I subscribed, but I can see why others may want that.

What I can't see though is in what situations you would not want a sequence to be sharable at all.

In the contrived example I gave both sequences are sharable – but their behaviour is different.

The difference is, the sequence that isn't broadcast will always start from zero when iterated by a new consumer. Whereas, the broadcast sequence will start from whatever value it emitted last to other consumers.

Some sequences though are not sharable by default, like AsyncStream, and that can definitely be a bit confusing.

Intuitively, I agree with you, but technically it's hard to break the two things apart which I imagine is the reason why the Rx libraries have the same coupling.

But in what situations do we actually want non-shared sequences/publishers?

You make a good point. Probably none. None that are publicly exposed anyway.

But I think AsyncStream is the only public asynchronous sequence I can think of with this behaviour. Maybe there's an argument to say that it should be upgraded to be a multicast asynchronous sequence in the future.