Async Algorithms `multicast(_:)` or equivalent

I think I'm getting a bit more clarity on the functionality we're getting here, and it seems to me that broadcast() is the equivalent of the share() operator in Combine. That's a really useful feature to have, but I'll definitely be missing the flexibility that Combine's multicast(_:) operator provides.

I know there's some reluctance to simply reproduce what Combine has done , but I do believe the composability of a Subject coupled with a connection strategy – which Combine's multicast(_:) facilitated – was a powerful combo. Now, I'm not saying all that plumbing necessarily needs to be exposed (although I'd like it if it was), but in terms of thinking about the matrix of possibility for broadcast use-cases – I think it's quite useful.

For me, multicast(_:) in Combine provided a way to compose two things:

  1. Buffering Strategy: Through the choice of subject, be it a hand-rolled ReplaySubject or PassthroughSubject, with a:
  2. Connection Strategy Through the choice the operator immediately following multicast(_:), be it connect(), autoconnect() or a hand-rolled referenceCounted()

To me, that is what multicast(_:) is really all about. Bridging a buffering strategy with a connection strategy. So while on the surface it appears complex, the decomposition of those two things means less algorithms are needed in the long-run.

Having said that, I do agree that it can feel overwhelming on first exposure, so perhaps there is something that can be done to clarify and simplify a subset of that power and flexibility for end users. Roughly, something like:

extension AsyncSequence {
  func broadcast(bufferingStrategy: BufferingStrategy = .last(0), connectionStrategy: ConnectionStrategy = .deferred(reconnects: false)) -> AsyncCompositeBroadcastSequence<Self>
}

enum BufferingStrategy { case last(Int), first(Int) }
enum ConnectionStrategy { case immediate, deferred(reconnects: Bool) }

So what does that actually mean? Well, in terms of use-cases and how you might go about constructing each of them with Combine's API (and assuming there were a ReplaySubject and referenceCounted() operator in Combine) you'd get something like:

  1. broadcast()
    Combine equivalent: multicast { PassthroughSubject() }.autoconnect() or share()
    Use-case: Perhaps something like the QuakeMonitor example where new entries are briefly displayed on a map.*
  2. broadcast(bufferStrategy: .last(1))
    Combine equivalent: multicast { ReplaySubject(1) }.autoconnect()
    Use-case: Some kind of ephemeral view state where, if a view reappears after being dismissed, it should initially show the state of its predecessor
  3. broadcast(connectionStrategy: .deferred(reconnects: true))
    Combine equivalent: multicast { ReplaySubject(1) }.referenceCounted()
    Use case: Distribution of sensor data, perhaps an accelerometer, where its useful to lazily start-up the hardware sensor, and ensure it's shut down when no longer needed.
  4. broadcast(bufferStrategy: .last(50), connectionStrategy: .immediate)
    Combine equivalent: multicast { ReplaySubject(50) }.connect()
    Use case: Logs. Starts immediately and keeps track of last 50 log entries, ready for when a consumer comes along in order to show a reasonable recent history.

* I actually think this would be a more natural fit for a connection strategy of .deferred(reconnects: true), which is interesting in itself.

In summary: I think to provide for the most common use cases, any multicast(_:) equivalent needs to at least provide 1) support for a buffering strategy (at least .last(Int)), and 2) support for a range of connection strategies (at least .immediate, .deferred(reconnects: Bool)).

1 Like