Kickoff of a new season of development for AsyncAlgorithms; Share

First off I would like to apologize for the time that has elapsed since this was a focus of mine, other tasks have taken up a good swath of my time and has left some of the development of AsyncAlgorithms a bit languished. In an effort to re-prime some of the development around this package there are a few outstanding administrative tasks, namely cutting a release for 1.0.4 in preparation for development of 1.1.

To that end, of starting the development for 1.1, we left off with a few algorithms marked for consideration. I will be reaching out to folks on PRs here shortly to revitlaize them, also if you have something open as a request marked for 1.1 already please feel free to DM me or ping me on github to see where we can schedule things.

Perhaps the most requested feature that is in the category of 1.1 features is the family of sharing. And particularly the specific algorithm share. So without further delays; the pitch!

Share

Introduction

Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time.

Motivation

There are many potential usages for the sharing concept of AsyncSequences.

One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values.

There are many other use cases that have been requested for this family of algorithms. Since the release of AsyncAlgorithms it has perhaps been the most popularly requested set of behaviors as additions to the package.

Proposed solution

AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations.

When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements.

It is critical to identify that this is one algorithm in the family of algorithms for sharing values. It should not attempt to solve all behavioral requirements but instead serve a common set of them that make cohesive sense together. This proposal is not mutually exclusive to the other algorithms in the sharing family.

Detailed design

A new extension will be added to return a Sendable AsyncSequence. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.

The Sendable annotation identifies to the developer that this sequence can be shared and stored in an existental any.

extension AsyncSequence where Element: Sendable {
  public func share(
    bufferingPolicy: AsyncBufferSequencePolicy = .unbounded
  ) -> some AsyncSequence<Element, Failure> & Sendable
}

The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. The signature reuses the existing AsyncBufferSequencePolicy type to specify the behavior around buffering either responding to how it should limit emitting to the buffer or what should happen when the buffer is exceeded.

Runtime Behavior

The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the beahviors there are a terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is refered to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the AsyncBufferSequencePolicy.bounded(_ : Int) policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself.

When the underlying type backing the share algorithm is constructed a new extent is created; this is used for tracking the reference lifetime under the hood and is used to both house the iteration but also to identify the point at which no more sides can be constructed. When no more sides can be constructed and no sides are left to iterate then the backing iteration is canceled. This prevents any un-referenced task backing the iteration to not be leaked by the algorith itself.

That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base AsyncSequence; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer.

The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the default and common behavior is to be unbounded. This allows for a progressive disclosure from common usage that just works out of the box with no configuration, to more advanced cases that need finer grained control. Furthermore there are scenarios where one might want ways of identifing dropped value events within the iteration of a side, this is something that will be addressed later in an upcoming proposal.

As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer).

Representing concurrent access is difficult to express all potential examples but there are a few cases included with this proposal to illustrate some of the behaviors. If a more comprehensive behavioral analysis is needed, it is strongly suggested to try out the pending pull request to identify how specific behaviors work. Please keep in mind that the odering between tasks is not specified, only the order within one side of iteration.

Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).

let exampleSource = [0, 1, 2, 3, 4].async.share()

let t1 = Task {
  for await element in exampleSource {
    if element == 0 {
      try? await Task.sleep(for: .seconds(1))
    }
    print("Task 1", element)
  }
}

let t2 = Task {
  for await element in exampleSource {
    if element == 3 {
      try? await Task.sleep(for: .seconds(1))
    }
    print("Task 2", element)
  }
}

await t1.value
await t2.value

This example will print a possible ordering of the following:

Task 2 0
Task 2 1
Task 2 2
Task 1 0
Task 2 3
Task 2 4
Task 1 1
Task 1 2
Task 1 3
Task 1 4

The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output.

If the creation were instead altered to the following:

let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2))

The output would print the possible ordering of:

Task 2 0
Task 2 1
Task 2 2
Task 1 0
Task 2 4
Task 1 3
Task 1 4

Some values are dropped due to the buffering policy, but eventually they reach consistency. Which similarly works for the following:

let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingOldest(2))
Task 2 0
Task 2 1
Task 2 2
Task 1 0
Task 2 4
Task 1 1
Task 1 2

However in this particular case the newest values are the dropped elements.

Effect on API resilience

This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing).

Alternatives considered

It has been considered that the buffering policy would be nested inside the AsyncShareSequence type. However since this seems to be something that will be useful for other types it makes sense to use an existing type from a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.

The buffering policy could easily not provide a default parameter and therefore require all developers to consider what policy suits their use, however this seems a bit sharp of a cliff of progressive disclosure.

A new protocol for identifying sharable AsyncSequence types could be introduced, however this might not be fully something that is required at this time.

58 Likes

Hmm… interesting! So the plan is for this share function to define a default implementation that returns a "sharable" sequence?

To what extent is this one default implementation expected to work across concrete async sequences like AsyncStream or AsyncChannel?

I guess another way to phrase that is if the default implementation should just fatal error or throw and our expectation is that a product engineer that needed sharing would need to either write or import a custom share function?

If we did return a "good" share sequence from the default share function would there still be legit use cases where a product engineer would want to override that default with their own implementation?

I don't see the underlying Iterator type documented in the pitch. Would the expectation be that the underlying Iterator does not ship as a public type?

So this algorithm effectively "turns" one non-sharable into a sharable sequence with the values produced from the base. E.g. if you had an assertion in the base of concurrent iteration that assertion would NOT be hit by using .share() on it.

2 Likes

While I appreciate the slightly more ā€œadvancedā€ example with the if conditions - I think it would be beneficial to start with the trivial case to illustrate the lack of inter-subscription ordering.

You are basically saying that - without the ā€˜if`s we could have seen e.g.


Task 2 0
Task 2 1
Task 2 2
Task 1 0
Task 1 1
Task 2 3
Task 1 2
Task 1 3
Task 1 4
Task 2 4

Right?

Also, sorry, forgot to say, bravo, well done, finally! I'm a member of the crowd who have been requesting and awaiting (pun intended) this algorithm for years. I'm very happy to finally see a formal pitch!

While implementation is interesting and educational, I do think that his pitch leans a bit too heavily on the implementation details side; I'd prefer seeing more details about the API, specifically by use of examples.

I might a bit slow today - I still haven't had my morning coffee - I must admit I did not fully understand the bufferingPolicy values .buffering* effect on the async sequence. Can you explain in detail why Task 2 3 line and Task 1 1 was omitted by use of .bufferingNewest(2)?

Perhaps you can update the pitch with more descriptive comments about this behaviour? Perhaps this pitch assumes knowledge of this buffering policies? (I lack such knowledge.)

Perhaps you can share (pun intended) some unit tests from the PR (heavily in line documented so it is easy to follow).

And again, thank you for your hard work!

4 Likes

@vanvoorden So the plan is for this share function to define a default implementation that returns a "sharable" sequence?

Iā€˜m not sure if Iā€˜m interpreting your post correctly, but I got the impression that you are understanding the pitch in a different way than I do.

I think the pitch does not offer a way to customize the creation of the AsyncShareSequence. It just adds a share() function to AsyncSequence which canā€˜t be overridden, as itā€˜s not an additional requirement of the protocol.

The AsyncShareSequence will always be of the same, concrete, type and use the normal public API to access the upstream sequence. So itā€˜s not necessary to provide a specialized version for different types of upstream sequences.

Sorry in case I got you wrong and this did not help in clarifying things.

1 Like

Thanks for moving this forward! I'm excited to see the share algorithm being prioritized. The pitch frames share as one algorithm in a larger family rather than the whole solution and I think that's accurate. I’d like to build on that framing and outline why a more general foundation might be worth considering.

Many types of multicast strategy

As we've identified in previous discussions, there are quite a few different behaviors we might want from multicasting algorithms:

Axis Typical choices
Initiation start immediately Ā· start manually Ā· start on first demand Ā· start when n consumers demand
Cancellation cancel when any consumer cancels Ā· cancel when all consumers cancel Ā· cancel only when last reference is dropped
Restart never restart Ā· restart with same initiation rules
Emission timing forward as soon as a consumer is ready (unsynchronised) Ā· forward when all are ready (synchronised)
Back-pressure run at source rate Ā· pace to fastest consumer Ā· pace to slowest consumer
Buffering and replay broadcast only new values Ā· replay n values to late subscribers Ā· round‑robin across subscribers

I think what this highlights is that we're dealing with a design-space rather than a single algorithm, and as such it's really important to get the foundation right

AFAICT, the pitch fixes one concrete point in this matrix: initiation on first demand, automatic cancellation when the last reference is dropped, no back‑pressure, shared buffer that broadcasts from the current element forward for new subscribers. That clarity is great, but keeping the above axes visible will help steer future additions.

Source vs Chained Approach

While the proposal follows a chained style (like map or filter), multicasting is ultimately a publication concern. Splitting publication from consumption suggests introducing a dedicated protocol:

// Simplified; modern isolation and error handling omitted for brevity
public protocol AsyncSource {
    associatedtype Input
    mutating func send(_ input: Input) async
    mutating func finish()
}

With that in place we could write a multicast‑capable type that is both a source and a sequence:

let sensor = AsyncBroadcastSource<Int>() // AsyncSource & AsyncSequence

// UI and logger iterate independently
for await value in sensor.share() { … } // consumer 1
for await value in sensor.share() { … } // consumer 2

A small set of such sources, AsyncBroadcastSource, AsyncReplaySource, maybe AsyncPassthroughSource would cover most practical needs while leaving room for third‑party expansion and specialisation.

Single-shot source sequences in the standard library

More broadly though, one of the main challenge developers face today is that source sequences like AsyncStream can't be safely iterated multiple times - they're "single shot" with undefined behavior on multiple iterations.

This seems at odds with Swift's design philosophy. It would be like including a Collection implementation in the standard library that is single-shot and for which successive iteration is undefined. Technically possible to implement, but I assume something we'd prefer new users aren't exposed to when picking up a new langauge.

A new set of built-in source multicast asynchronous sequences would be a good way to address this issue.

Proposed Solution

  1. Add AsyncSource to the standard library (or AsyncAlgorithms in incubation).
  2. Ship a minimal set of source types (Broadcast, Replay, …) that conform to AsyncSource & AsyncSequence and cover the common matrix points above. (Broadcast could broadly adopt the strategies outlined in the pitch and be the first AsyncSource-conforming type to ship.)
  3. Keep share as the ergonomic adapter that promotes any single‑consumer AsyncSequence into a multicast‑capable one by forwarding to an AsyncSource.

Thanks again for kicking off this pitch, looking forward to asynchronous sequences reaching their potential.

9 Likes

Thanks, Phillippe, for moving this forward!

I wonder if it makes sense to look at (and maybe synchronize with) @FranzBusch’s pitch about a reverse semantics operator: MultiProducerSingleConsumerAsyncChannel.

I like clarity of the name, even though itā€˜s maybe a bit unwieldy.

1 Like

I'm all for it! One pattern that we use a lot is the single producer multiple consumer (we call it a Publisher) and that feels like a good fit for AsyncAlgorithms.

I’m really excited to see a return to AsyncAlgorithms, but as an iOS developer, I’m not clear if this change is intended for me. I’m not suggesting it’s wrong. I’m just worried that when this is released, it will almost but still not quite be enough for the things we need every day.

Which app-dev use cases are included in the ā€œmotivating examplesā€ that are driving this design? In particular, how would an iOS dev would use this to replace existing Combine code that we couldn’t before?

Rob

5 Likes

I think this looks great!

Question on buffering: if in the example we waited until task one got all 5 values BEFORE task two was started, would task two get all 5 values or would it get zero values?

I ask because I’ve definitely had combine cases where a subscriber missed the one and only value of a publisher and I’d want to make sure I got something. I’d also like to know if everything is stored forever or what exactly. Thanks!

1 Like

I’ve had combine cases like this. An example would be listening to some kind of complex publisher of say the contents of a search field being typed into and having two different combine subscribe chains that both reacted to it but needed more inputs. Say like the results of the search and also the state of some supporting search UI.

In those cases if I didn’t have something like this, a value might go only to one of the chains and they wouldn’t be in sync.

1 Like

Correct; that would be a reasonable ordering. In practice however it usually is 0, 0, 1, 1, 2, 2, 3, 3, 4, 4.

That might be the most productive way to get that out there; enumerating them all in the pitch is difficult to describe in prose, a test might be more elucidating - it is on my list of things to do.

100% correct.

Those are a really good outline of what I would like to consider for other members of that family. I think it is reasonable to include part of that in the pitch! So if you don't mind, I'd love to get that in there so we can use that as documentation for the others that may come along.

I tried that approach a while back ago, it unfortunately ends up being rather tricky to implement that way. From my previous noodling it almost feels like it is not tractable as a route to make a functional sharing system.

These are not fully at odds, Sequence has the same consideration of being "one-shot", Collection et al types however don't. In reality if we had the power to retroactively change history there could have been a world in which we made ~Copyable types before Sequence and both AsyncSequence and Sequence would be ~Copyable by default and only "sharable" AsyncSequence types would be Copyable.

This is definitely one of the next few 1.1 types I was planning on running.

AsyncAlgorithms, and specifically .share() are most assuredly intended for app developers. Specifically you mentioned replacing usages of Combine; this is one of the sore spots for AsyncSequence that has prevented folks from migrating from Combine to async/await. Hopefully adding share (as an analog for the Publisher .share) will bridge the last few remaining gaps.

In this case the second task might not get any values because at the point of attachment the buffer would be drained fully. There is another similar algorithm that does replay things but share is not that one.

5 Likes

Thanks for the confirmation. Do you have examples of real-ish Combine code that that this would let us replace? Currently, I can almost never use AsyncAlStreams to replace Combine (every time I’ve tried, I’ve eventually put the Combine back in). I’d really like to see which cases this would let me replace. Do you have a sample SwiftUI app that shows any of this?

1 Like

So the analog API is share(). The example there can be somewhat transliterated using AsyncAlgorithms. But practically any place that share would have been used for Combine would be an appropriate analog to this share function - the are very similar.

Most of the interop from Combine to SwiftUI was via ObservableObject and @Published those are instead done via @Observable and the remaining non-ui usages (app logic or other wise) for @Published can be expressed via things like Observations (SE-0475). Perhaps the other key integration point for asynchronous sequence consumption (which an iterating "side" of a share would be used) would be in uses of .task { ... }.

Granted this is a bit off-topic, but it might be good to gather some feedback on why would would have to "put Combine back in"; since that is a severe stumbling block for migration to strict concurrency checking. It might be worthwhile to start up another thread just for that discussion and collect in earnest what things would eventually allow for the full migration to using concurrency and AsyncAlgorithms as a vehicle for that.

5 Likes

Of course! It's not an exhaustive list, but it's probably a good start.

I imagine modern concurrency makes it tougher still. Perhaps with SAA as a separate library, it still leaves the door open for change in the future if the language adapts to make it possible.

@cocoaphony 's concerns about being "almost but still not quite" enough reflect my own: with the size of the design space its very hard to get to a one-size-fits-all solution with a single type.

Even the initiation axis is quite broadly varied (from my experience using rx/combine) and it's difficult for me to choose just one.

Stuff like manual starts being necessary when you have N consumers, and you can only begin sending events once they're all connected. Or replays being necessary for UI bits. Or needing to discard sources so that they properly dereference some expensive resource once the last consumer leaves, but then restarts it when a new one appears. Etc.

So, I'm not really fixed on a particular solution, but I do think it's worth considering upfront how we make the idea of multicast sequences composable so that we don't end up with an overwhelming number of single-purpose types, or worse, provide only a partial solution to the problem.

Sorry, yes, I meant Sequence! The ~Copyable solution makes a lot of sense to me.

Regardless, I still think a multicast-capable AsyncSource & AsyncSequence conforming type in the standard library would make asynchronous sequences much more accessible and intuitive for newcomers to the language. And it just seems the more composable design. So fingers crossed we solve whatever blockers are preventing that from happening.

Thanks Philippe. One question I had was around combine’s multicast, I’ve used that a lot to not only buffer and not miss values but essentially to also share that buffer.

Just to make sure I understand here: this does do the sharing but not the buffering control that I would want (making sure to not drop the last item for new connections). And thus multicasting would be a different additional api that would require an additional proposal?

Nice to see resumption of work here.

Similar to @tcldr, I think it is a requirement to be very specific about the exact behavior of cancellation whenever we design fan-out operations.

There'a short note on the behavior here:

As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer).

There's more to be explained here, I think cancellation needs its own section with examples. Specifically, we're saying that a cancellation of a single downstream does not cancel the upstream of the share operator. This is important and worth calling out explicitly.

Alternate behavior like cancelling upstream and failing other downstreams or just completing them upon any downstream cancellation can also be legal and useful behaviors, and I think we'll either want to provide a strategy, or offer other operators which have such behavior. Perhaps a CancellationStrategy enum would probably be nicest to express it here? It could be done in the future though.

7 Likes

I was toying with this once, and found out that this could lead to odd, "priority-inversion-like" behaviours, where a task which is not as "important" would be canceled readily, thus cancelling the whole stream for other high-importance tasks that are less likely to cancel themselves in the first place.

This could be valid indeed for the cases where all consumers are equally likely to be cancelled, but I'd argue this should be spelled out as explicitly as humanly possible, perhaps even warranting a new type.

Yeah explicitness for those behaviors is good, but I don't think it necessarily it is a new operator, because we'd end up with a lot of operators which have names either very descriptive or not descriptive enough. I'd suggest we define a set of CancellationPolicy cases and share those between any fan-out operations we may end up supporting:

share(bufferingPolicy: .bufferingNewest(2), cancellationPolicy: .cancelOnFirstDownstreamCancel) - I'm sure there's a bikeshed to be had about the names here though.

There's also other decisions like the buffering behavior, so this goes in-line with the "i need to configure the behavior of fan out operations" we've already acknowledged by allowing configuring buffering.

1 Like