Async Algorithms `multicast(_:)` or equivalent

Looks great – excited to see this coming to fruition.

I also agree it would be enhanced by the addition of a 'replay' type algorithm to provide late-coming consumers with initial state to, for example, populate any UI whilst waiting on the next element.

The other thing that I think would make this even more useful is the addition of wider support for 'cold' sequences (using the Rx terminology). That would make the utility of the 'lazy task initialisation -> shared consumers > last consumer clean-up' cycle, especially appealing.

I can't yet see any simple way of achieving this, unless I'm missing something.

To clarify, it would be great to see a 'cold' dual of AsyncStream. With AsyncStream, the supplied closure appears to execute immediately creating a 'hot' stream, whereas with something like an AsyncDeferredStream or AsyncLazyStream I'm imagining a closure that gets executes lazily each time a consumer attempts to iterate upon it. So essentially a 'cold' async stream that can be iterated multiple times.

The wider utility of this is that when coupled with an AsyncBroadcastSequence you get a really neat way to start/stop these one-to-many type broadcastable sequences.

In a world where we've got primary associated types for async sequences, I'm imagining something like:

lazy public private (set) var quakes: some AsyncSequence<Quake> = {
    AsyncDeferredStream { continuation in
        let monitor = QuakeMonitor()
        monitor.quakeHandler = { quake in
            continuation.yield(quake)
        }
        continuation.onTermination = { @Sendable _ in
             monitor.stopMonitoring()
        }
        monitor.startMonitoring()
    }
    .broadcast(replay: 1)
}()

Here, the closure supplied to AsyncDeferredStream would be executed upon connection of the first consumer, then the sequence shared with all subsequent consumers, before being terminated on cancellation of the last remaining consumer. Only to be re-executed on the connection of a new consumer.

I think this works out as a really tidy way of providing one-to-many broadcasts in a resource efficient manner.