Swift Async Algorithms Proposal: AsyncDeferredSequence

Deferred

Introduction

AsyncDeferredSequence provides a convenient way to postpone the initialization of a sequence to the point where it is requested by a sequence consumer.

Motivation

Some source sequences may perform expensive work on initialization. This could be network activity, sensor activity, or anything else that consumes system resources. While this can be mitigated in some simple situtations by only passing around a sequence at the point of use, often it is favorable to be able to pass a sequence to its eventual point of use without commencing its initialization process. This is especially true for sequences which are intended for multicast/broadcast for which a reliable startup and shutdown procedure is essential.

A simple example of a seqeunce which may benefit from being deferred is provided in the documentation for AsyncStream:

extension QuakeMonitor {

    static var quakes: AsyncStream<Quake> {
        AsyncStream { continuation in
            let monitor = QuakeMonitor()
            monitor.quakeHandler = { quake in
                continuation.yield(quake)
            }
            continuation.onTermination = { @Sendable _ in
                 monitor.stopMonitoring()
            }
            monitor.startMonitoring()
        }
    }
}

In the supplied code sample, the closure provided to the AsyncStream initializer will be executed immediately upon initialization; QuakeMonitor.startMonitoring() will be called, and the stream will then begin buffering its contents waiting to be iterated. Whilst this behavior is sometimes desirable, on other occasions it can cause system resources to be consumed unnecessarily.

let nonDeferredSequence = QuakeMonitor.quakes //  `Quake.startMonitoring()` is called now!

...
// at some arbitrary point, possibly hours later...
for await quake in nonDeferredSequence {
    print("Quake: \(quake.date)")
}
// Prints out hours of previously buffered quake data before showing the latest

Proposed solution

AsyncDeferredSequence provides a way to postpone the initialization of an an arbitrary async sequence until the point of use:

let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed

...
// at some arbitrary point, possibly hours later...
for await quake in deferredSequence {  //  `Quake.startMonitoring()` is now called
    print("Quake: \(quake.date)")
}
// Prints out only the latest quake data

Now, potentially expensive system resources are consumed only at the point they're needed.

Detailed design

AsyncDeferredSequence is a trivial algorithm supported by some convenience functions.

Functions

public func deferred<Base: AsyncSequence>(_ createSequence: @escaping @Sendable () async -> Base) -> AsyncDeferredSequence<Base>
public func deferred<Base: AsyncSequence>(_ createSequence: @autoclosure @escaping @Sendable () -> Base) -> AsyncDeferredSequence<Base>

The synchronous function can be auto-escaped, simplifying the call-site. While the async variant allows a sequence to be initialized within a concurrency context other than that of the end consumer.

public struct AsyncDeferredSequence<Base: AsyncSequence>: Sendable {
  public typealias Element = Base.Element
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Element?
  }
  public func makeAsyncIterator() -> Iterator
}

Naming

The deferred(_:) function takes its inspiration from the Combine publisher of the same name with similar functionality. However, lazy(_:) could be quite fitting, too.

Comparison with other libraries

ReactiveX ReactiveX has an API definition of Defer as a top level operator for generating observables.

Combine Combine has an API definition of Deferred as a top-level convenience publisher.

Effect on API resilience

Deferred has a trivial implementation and is marked as @frozen and @inlinable. This removes the ability of this type and functions to be ABI resilient boundaries at the benefit of being highly optimizable.

Alternatives considered

3 Likes

Thanks for opening this proposal!

I think I understand why you want this e.g. wrapping current closure based APIs that should only be lazily called once an iterator has been created. If I get that correct I would like to propose a slightly different alternative here: Instead of having an algorithm that lazily creates an upstream sequence what do you think about having a variant of AsyncStream that only invokes it's closure when the first demand is signalled.

My thinking behind this is that the composability of AsyncDeferredSequence is slightly awkward since you need to supply a factory closure to it. Furthermore, from my experience this is almost always needed in the case of starting a new root AsyncSequence and not as a transformation applied further down the algorithm chain.

Let me know what you think about that!

1 Like

No problem! Happy to contribute in some small way.

That's correct. Or, any 'hot' sequence that we may come across in the future which we need to convert to be cold.

I'm not opposed to that. In fact, it's what I originally proposed in the multicast(_:) thread, but I do wonder if they'll be sequences other than AsyncStream that need to be converted to cold – possibly third-party sequences.

I think this would be nice, but I don't think it solves the original inspiration for this problem, which we thrash about a little in the multicast(_:) thread. Essentially, multicast(_:) needs someway of restarting its source sequence. If that's an AsyncStream, it needs to be an AsyncStream which invokes its closure for each successive consumer.

I'm not opposed to this solution at all, but I do wonder if a generalised deferred(_:) will be useful for converting other hot sequences people may come across.

Sorry I completely missed the older parts of the multicast discussion. However, I don't think this statement holds true:

The idea behind broadcast would be that there is a single upstream AsyncSequence that gets consumed by the AsyncBroadcastSequence. The AsyncBroadcastSequence is then responsible to forward each element to the downstream consumers. There should not be a new iterator be created to the upstream sequence for every downstream consumer. This would defeat the point of broadcast since it should be able to transform a unicast sequence to a broadcasting one.

In general, I am also not super opposed to deferred; however, I am not the biggest fan of its composability. Maybe we should rather start with a base sequence that is lazy and take it from there.

Would love to hear what @Philippe_Hausler thinks about this!

Hi

@Philippe_Hausler why is AsyncStream factory function not deferred in the first place? Is it only to be able to escape the continuation so we can use it in an imperative way ?

I think I agree with @FranzBusch. If the real need for a deferred operator comes from a particularity of the AsyncStream implementation then it might not apply elsewhere.

Well, it depends on what you're trying to do.

I think one of the huge benefits is being able to compose sequences to produce different effects. When you compose a (cold sequence)->(multicast)->(referenceCounted) you get the effect of a cold sequence that is only shared when there are active consumers. It also allows the encapsulation of startup and shutdown of that cold sequence.

So, yes, a cold sequence is only shared once at any one time, but if there are subsequent sessions, that cold observable needs to be started up and shutdown. It needs to be called multiple times.

Edit: I should add, @Philippe_Hausler 's implementation of multicast(_) in the previously mentioned thread does away with the need for refCounted, so all that is required is the deferred(_:) operator.

1 Like

AFAIK that is correct. The goal of AsyncStream was to provide a root AsyncSequence where other code can yield/finish it. Escaping the continuation is from what I have seen the thing that 95% of the users are doing.

I agree on that 100%. Algorithms should always be cold unless stated otherwise and the root AsyncSequence is basically dictating if the underlying production is cold/hot.

I understand what you want to achieve here. However, that is not the goal of broadcast in my opinion. broadcast is able to transform a unicast AsyncSequence to a multicast one where each active consumer gets all the produced elements. It is really important that while doing so it only creates a single AsyncIterator of the upstream sequence since not all AsyncSequences are supporting multiple iterators e.g. unicast AsyncSequences support only 1.

What you want is definitely something that should be supported as well but it is slightly different than what broadcast ought to do in my opinion.

1 Like

Though the above is probably for the other thread about broadcast. In general, I am in support of adding the deferred functionality. It has its use-cases and makes sense to add!

1 Like

I think we're in agreement here. When using multicast we want to only have one source sequence. 100% agree, no argument on that.

Let's take an example. Imagine a cold sequence that, when subscribed to, starts an expensive sensor (accelerometer, mic, etc.) and when cancelled, stops that sensor. Obviously there's only one sensor so it needs to be shared (multicast). Now, the problem is we don't know when consumers will come along, if they'll come along, and how many will come along.

We need some way of lazily starting the sensor when a consumer arrives, and then shutting off the sensor when the last consumer leaves. And crucially, we need to be able to restart it if a new consumer comes along.

Therefore, while I agree that there should only ever be, at most, one concurrent source sequence, over time we could need to generate that source sequence multiple times.

1 Like

My statement is slightly different. When using broadcast we want to have only a single upstream AsyncSequence and we want to create a single iterator to that sequence.

I understand that and I agree that this ought to be modelled; however, I think this requires slightly different types. What you want is a root AsyncSequence that only produces values when there is a consumer (iterator). That is easily implementable.
At the same time you want the elements to be broadcasted. The problem here is that the broadcast algorithm creates a single iterator which is kept alive until the AsyncBroadcastSequence is deinited and there are no remaining consumers (iterators).

Thinking out loudly here maybe this would be composed like this:

  • OnOffAsyncSequence (bad name but basically informs you when there is an iterator or not so that you toggle the sensor on/off)
  • OnOffBroadcastAsyncSequence (even worse name but basically a broadcast algorithm that only has an upstream iterator when there is a downstream iterator. This might be even a configuration on the proposed broadcast algorithm)

I think what we are lacking in general a bit is a more defined terminology and ruleset around AsyncSequences so that we have a better foundation of discussing all of this :slight_smile:

In my opinion all of this here is very useful discussion that we need to have to make sure we are supporting every use-case!

1 Like

Yes, this is essentially what I'm trying to do with deferred(_:) and an AsyncStream. Does it achieve that?

I see... I wonder if there is scope to make broadcast(_:) 'lazy' on iterating its source? i.e when the first consumer arrives, and then cancelling once the last consumer leaves. Maybe @Philippe_Hausler could give some insight?

Sounds interesting. I seem to come across this pattern quite often.

Sounds good to me! Happy to chime in where I can. :slight_smile:

The WIP broadcast implementation I have up right now starts the iteration upon the first ask to next(). But it does not cancel once the last consumer leaves... because you could have more incoming, but it does cancel when the last reference is deallocated (e.g. it is known to be unique).

1 Like

Ah, I see! I think I finally get it now. So it's equivalent to the share() operator in Combine, or .multicast { PassthroughSubject() }.autoconnect(). I definitely have more thoughts on that then, which I'll write up in the multicast(_:) thread.

EDIT: I've now written up those thoughts on multicast(_:) here.