Async Algorithms `multicast(_:)` or equivalent

Hi All,

I understand that there is currently a focus on honing what's already in place, but I would be really appreciative of a multicast(_:) equivalent now that the AsyncChannel/AsyncThrowingChannel sequences are in place.

As justification: I think we have some great algorithms for transforming, combining and merging sequences, but think that the functionality of a multicast(_:) or equivalent algorithm will help to create some tidy APIs at the sequence pipeline source.

What I'd like to achieve is the ability to:

  1. Lazily initialise a source sequence upon connection of the first consumer
  2. Share that sequence with subsequent consumers
  3. Clean-up of the sequence after the last consumer cancels.

This is great for things like accelerometer data, or tapping audio data. Where you may have multiple interested consumers at any one time, and you only want to start sampling when a consumer is present, and also cease sampling when interested consumers have gone.

In Combine/Rx it might have been achieved by creating a custom Publisher that handled the initialisation/clean-up piece (achieved with an AsyncStream, today) then placing it behind a multicast(_:) operator to share with multiple consumers and referenceCounted() (not in Combine) to handle discarding it when all the consumers are done.

This makes for really elegant API where consumers can all share a single sequence and not need to concern themselves with how to start or stop whatever the underlying source might be.

4 Likes

I pushed a WIP branch with something in this territory: [WIP] Broadcast algorithm by phausler · Pull Request #214 · apple/swift-async-algorithms · GitHub

It lazily creates a task to iterate the upstream base. It shares the produced values with subsequent consumers and it cleans up after the last consumer cancels (or is deallocated). So I think this satisfies precisely the set of needs that are most common for this shape.

There are a couple of interesting tidbits that fall out of this. Firstly the base AsyncSequence MUST be Sendable. Second, the Base.AsyncIterator does not need to be Sendable but the Base.Element must be. It also infers there are a couple of different variants in this family of algorithms; one might be a full-on replay algorithm, another might be a "please wait until everyone has consumed the element" algorithm, and I am sure there are likely others.

We are closing in on the 1.0 so once that hits this is definitely an area that I think is worth discussing further and active development/testing/documentation upon.

8 Likes

Looks great – excited to see this coming to fruition.

I also agree it would be enhanced by the addition of a 'replay' type algorithm to provide late-coming consumers with initial state to, for example, populate any UI whilst waiting on the next element.

The other thing that I think would make this even more useful is the addition of wider support for 'cold' sequences (using the Rx terminology). That would make the utility of the 'lazy task initialisation -> shared consumers > last consumer clean-up' cycle, especially appealing.

I can't yet see any simple way of achieving this, unless I'm missing something.

To clarify, it would be great to see a 'cold' dual of AsyncStream. With AsyncStream, the supplied closure appears to execute immediately creating a 'hot' stream, whereas with something like an AsyncDeferredStream or AsyncLazyStream I'm imagining a closure that gets executes lazily each time a consumer attempts to iterate upon it. So essentially a 'cold' async stream that can be iterated multiple times.

The wider utility of this is that when coupled with an AsyncBroadcastSequence you get a really neat way to start/stop these one-to-many type broadcastable sequences.

In a world where we've got primary associated types for async sequences, I'm imagining something like:

lazy public private (set) var quakes: some AsyncSequence<Quake> = {
    AsyncDeferredStream { continuation in
        let monitor = QuakeMonitor()
        monitor.quakeHandler = { quake in
            continuation.yield(quake)
        }
        continuation.onTermination = { @Sendable _ in
             monitor.stopMonitoring()
        }
        monitor.startMonitoring()
    }
    .broadcast(replay: 1)
}()

Here, the closure supplied to AsyncDeferredStream would be executed upon connection of the first consumer, then the sequence shared with all subsequent consumers, before being terminated on cancellation of the last remaining consumer. Only to be re-executed on the connection of a new consumer.

I think this works out as a really tidy way of providing one-to-many broadcasts in a resource efficient manner.

I'm not certain that an another top level AsyncSequence would be the best move to do that; instead I think perhaps an algorithm to apply to any AsyncSequence would have better utility.

That would be great, but how would that look? I can't imagine how it would work without another top-level sequence. Using the example above, there needs to be someway to express that startMonitoring should only executed with the first consumer arrives, but maybe I'm confused with terminology.

I can imagine something like an AsyncDeferredSequence wrapping any other sequence. e.g.

lazy public private (set) var quakes: some AsyncSequence<Quake> = {
    AsyncDeferredSequence {
        AsyncStream { continuation in
            let monitor = QuakeMonitor()
            monitor.quakeHandler = { quake in
                continuation.yield(quake)
            }
            continuation.onTermination = { @Sendable _ in
                 monitor.stopMonitoring()
            }
            monitor.startMonitoring()
        }
    }
    .broadcast(replay: 1)
}()

Is that what you mean?

I was thinking something more like:

  AsyncStream { continuation in
    let monitor = QuakeMonitor()
    monitor.quakeHandler = { quake in
      continuation.yield(quake)
    }
    continuation.onTermination = { @Sendable _ in
      monitor.stopMonitoring()
    }
    monitor.startMonitoring()
  }
  .broadcast(replay: 1)

Specifically skipping the wrapper AsyncDeferredSequence and using composition as you picked out already.

1 Like

Thanks for your response – if it's possible to do it like that, that would be great! No doubt the ideal interface, but I think I'm missing something here.

My understanding with AsyncStream was that the closure supplied to the initialiser would be called immediately, and therefore monitor.startMonitoring() would be called before the first consumer arrives.

i.e. AsyncStream is 'hot'. Is that not the case? Is the closure only called when AsyncStream's iterator is created?

Ah yea that might be an additional algorithm/interface that is needed.

We could streamline it with tricks via auto-closure:

public func deferred<Base: AsyncSequence>(_ base: @autoclosure () -> Base) -> AsyncDeferredSequence<Base>

That way we can reduce the call site to be a bit more succinct.

deferred(AsyncStream { continuation in
    let monitor = QuakeMonitor()
    monitor.quakeHandler = { quake in
      continuation.yield(quake)
    }
    continuation.onTermination = { @Sendable _ in
      monitor.stopMonitoring()
    }
    monitor.startMonitoring()
  }).replay(history: 1)

And I think the replay behavior probably should be its own algorithm and not per-se wrapped up in broadcast itself. For that particular part of the family of multicast algorithms I do wonder if there is a utility of ever not having a deferred base? e.g. is the concept of replaying always based upon deferred access?

That's a good question – thinking about the Combine/Rx implementation of multicast(_:) there was the option to provide a subject via a closure. Now, I can't recall if the closure supplied in Combine's multicast(_:) was deferred, but perhaps it follows then, that this broadcast algorithm is called in a similar way.

public func broadcast<Base: AsyncSequence>(_ deferredBase: @autoclosure () -> Base) -> AsyncBroadcastSequence<Base>

It is not. That is a totally separate thing.

Ah, yes, I see – my mistake. My memory is fuzzy on how I used to handle this with Combine. I used the multicast(_:) operator and followed it up with my own implementation of .referenceCounted() to achieve the lazy initialization effect described above.

For the replay style broadcasts, personally, I've never used it without a non-deferred base, but then it does seem strange to constrain it to deferred-only if it's at all avoidable.

Off the top of my head, if some app has some logs streaming via a non-deferred sequence, it might be nice to be able to offer the last fifty log entries to new consumers.

Do we need a separate thread for deferred(_:)?

I think I'm getting a bit more clarity on the functionality we're getting here, and it seems to me that broadcast() is the equivalent of the share() operator in Combine. That's a really useful feature to have, but I'll definitely be missing the flexibility that Combine's multicast(_:) operator provides.

I know there's some reluctance to simply reproduce what Combine has done , but I do believe the composability of a Subject coupled with a connection strategy – which Combine's multicast(_:) facilitated – was a powerful combo. Now, I'm not saying all that plumbing necessarily needs to be exposed (although I'd like it if it was), but in terms of thinking about the matrix of possibility for broadcast use-cases – I think it's quite useful.

For me, multicast(_:) in Combine provided a way to compose two things:

  1. Buffering Strategy: Through the choice of subject, be it a hand-rolled ReplaySubject or PassthroughSubject, with a:
  2. Connection Strategy Through the choice the operator immediately following multicast(_:), be it connect(), autoconnect() or a hand-rolled referenceCounted()

To me, that is what multicast(_:) is really all about. Bridging a buffering strategy with a connection strategy. So while on the surface it appears complex, the decomposition of those two things means less algorithms are needed in the long-run.

Having said that, I do agree that it can feel overwhelming on first exposure, so perhaps there is something that can be done to clarify and simplify a subset of that power and flexibility for end users. Roughly, something like:

extension AsyncSequence {
  func broadcast(bufferingStrategy: BufferingStrategy = .last(0), connectionStrategy: ConnectionStrategy = .deferred(reconnects: false)) -> AsyncCompositeBroadcastSequence<Self>
}

enum BufferingStrategy { case last(Int), first(Int) }
enum ConnectionStrategy { case immediate, deferred(reconnects: Bool) }

So what does that actually mean? Well, in terms of use-cases and how you might go about constructing each of them with Combine's API (and assuming there were a ReplaySubject and referenceCounted() operator in Combine) you'd get something like:

  1. broadcast()
    Combine equivalent: multicast { PassthroughSubject() }.autoconnect() or share()
    Use-case: Perhaps something like the QuakeMonitor example where new entries are briefly displayed on a map.*
  2. broadcast(bufferStrategy: .last(1))
    Combine equivalent: multicast { ReplaySubject(1) }.autoconnect()
    Use-case: Some kind of ephemeral view state where, if a view reappears after being dismissed, it should initially show the state of its predecessor
  3. broadcast(connectionStrategy: .deferred(reconnects: true))
    Combine equivalent: multicast { ReplaySubject(1) }.referenceCounted()
    Use case: Distribution of sensor data, perhaps an accelerometer, where its useful to lazily start-up the hardware sensor, and ensure it's shut down when no longer needed.
  4. broadcast(bufferStrategy: .last(50), connectionStrategy: .immediate)
    Combine equivalent: multicast { ReplaySubject(50) }.connect()
    Use case: Logs. Starts immediately and keeps track of last 50 log entries, ready for when a consumer comes along in order to show a reasonable recent history.

* I actually think this would be a more natural fit for a connection strategy of .deferred(reconnects: true), which is interesting in itself.

In summary: I think to provide for the most common use cases, any multicast(_:) equivalent needs to at least provide 1) support for a buffering strategy (at least .last(Int)), and 2) support for a range of connection strategies (at least .immediate, .deferred(reconnects: Bool)).

1 Like

Thanks for writing out your detailed feedback here! I really appreciate it!

I agree that we should have a solution for most of the things you laid out here. However, I am not sure if we should overload a single algorithm with a bunch of configuration to achieve all at once. Maybe we can come up with a better naming for the individual things here.

For me personally multicast means that an AsyncSequence allows multiple consumers (iterators) to consume it; however, it doesn't specify how that is done. broadcast is an algorithm that transform any sequence into a multicast sequence where each active consumer gets all produced elements.
If we introduce configuration to it that changes this behaviour then it becomes hard to reason about what is going on and even harder to communicate to somebody else which kind of broadcast is going on.

So maybe we can find better names for the kind of multicast transformations that convey what really is going on.

1 Like

Yes. I think the naming does make things difficult and that's partly what I was trying to communicate.

I also agree a single algorithm shouldn't be overloaded with all this functionality, which is why I refer to the Rx/Combine multicast(_:) which handled the complexity by making the problem composable.

What I mean is, while multicast(_:) in Rx/Combine does facilitate multicasting, it's quite useless by itself. It needs 1) a Subject, and 2) a connectable operator like connect(), autoconnect() or refCounted() to be of any use.

So that's what I was trying to distill: In 95% of use cases for Rx/Combine multicast(_) the Subject is essentially the choice of a 'buffering strategy', and the follow-up operator (connect()/ autoconnect()/refCounted()) the choice of a 'connection strategy'.

By breaking it down in this way, it may make the functionality easier to explain to end users and avoid the need to discuss 'connectable' publishers, etc. – even if under the hood that's how they're constructed.

But I do think that exposing the full matrix of functionality provided by these combinations will be necessary at some point.

(Especially a refCounted() equivalent, I really missed that in Combine, it has so many use cases.)