Kickoff of a new season of development for AsyncAlgorithms; Share

Nope; .bounded(10) would make all consumers await the resumption when the buffer drops below 10 entries. This just shifts the buffering to the runtime rather than the operator.

I have an idea for this; because detecting dropped values is something that is valuable and I am very convinced we can compose a solution for that - but we are currently missing two parts to that - one of which is the concept of share, and the second part is the .enumerated() algorithm. Those in conjunction with use of the reductions algorithm can let downstream consumers know when a value is dropped either from share or from any other buffering system in between it. But that is for a future pitch.

1 Like

Ah, good to know. As long as there's no silent value loss, then a lower default seems fine. Though you mention the buffer dropping below 10 entries. What happens to the producer if the buffer is full? If there's no risk of data loss, what's the purpose of a buffer at all?

1 Like

It makes the sides await the consumption of the buffer until it is not full before allowing any additional iteration.

To allow variation in the rate of consumption; it gives a window of "the sides must be within N items away from each other".

As per defaults go it isn't a horrible one... as a matter of fact, a .bounded(1) is not totally unreasonable.

Side Question on that

This is actually an interesting idea I would like to hear more about… this is sort of like "AsyncChannel with buffering" or "AsyncStream with backpressure"? Are there concrete sequences shipping in Standard Library or AsyncAlgorithms today where I can see this in action?

Particularly this option was lifted from here:

1 Like

Doesn’t that mean there would then be a symmetric data loss to all downstream consumers, or ā€œsidesā€ as you call them, if any one of them didn’t consume the elements fast enough? That sounds similarly problematic to me as the earlier suggestion of letting any consumer cancel all of them.

There certainly isn’t any pushback to the upstream producer’s side because async sequences are produced via unblocking sync continuation calls.

No there is no data loss as in it is guaranteed that every produced element will be received by every active consumer. However, what you will observe is that a slow consumer is capable of bringing the entire system to a stop until it consumes the next element from the internal buffer of share which will trigger a new next call on the base async sequence that is being shared. In a healthy system this won’t be a problem since all consumers are expected to handle elements in a reasonable amount of time.

1 Like

Ah, right, thanks!

And that’s because the (original) upstream would be calling yield(_:) or yield(with:), immediately receiving a YieldResult informing about the (slowest consuming) downstream.

So as long as the upstream chooses not to discard elements at that point, there’ll be no data loss. I stand corrected!

This point reminds me of another potential axis on our behaviour matrix. That of determining which actor the multicast iterator is isolated to.

The proposal currently has the iterator running as a detached Task. It’s the same solution I came up with when I was experimenting with a multicast algorithm as it doesn’t (typically) make sense to have one side owning the iteration.

However, this risks more actor-hopping than necessary, and would make the algorithm less useful in UI contexts for example.

It would seem to be a good thing if the user had control over the isolation context of the shared iterator. The aim being to allow synchronised source and consumer isolation and avoid actor-hopping.

Perhaps easier said than done.

In a typical scenario, it would probably make sense for the shared iterator to have the same isolation as the source sequence. But perhaps sometimes it would be more efficient for the shared iterator to have the same isolation as the consumers so that when an element is received, it can be distributed without suspension to all consumers.

In terms of a solution, perhaps supplying the target actor on which the shared iterator should be constructed would be one solution. Or, if going the AsyncSource direction, a good default could be to inherit the isolation at the point of construction.

Before talking about the isolation, the unstructured task is not only due to that but mainly to protect the iteration from cancellation of the downstream.

One solution to allow users to control the isolation is to let them supply the task that is actually doing the iteration but that would change the shape of the method a lot:

let someAsyncSequence = ...
someAsyncSequence.share { sharedAsyncSequence in // We are taking over the calling task here including the isolation
  // Sharing is happening until the end of the scope of this closure
}

This is more inline with Structured Concurrency but it does make such algorithms essentially non-compositional. You can’t chain them anymore linearly but need to do that inside closures.

Inherting the isolation at construction time is essentially making the type non-Sendable. The isolation can still change when the value is being send forward. I don’t think it is possible at this time to make a type polymorphic over the isolation which is I think what you are asking for here.

Yes, I totally get that. I had previously arrived at the same solution myself but was always dissatisfied with the MainActor → Global Actor → MainActor indirection this creates.

Yes, which makes it easier said than done. It’s been a long time since I’ve dug around here, but I imagined passing some async-iterator-constructor function – isolated to the actor of choice – to make this possible. But I’m guessing that would require makeAsyncIterator() to be asynchronous, which of course, it isn’t.

Right, because makeAsyncIterator() would still need to be asynchronous.

Yes, exactly right. I think this is a big limitation and somewhat limits the potential. I hope this can be considered in a broader language discussion.

Thanks Philippe for tackling this proposal. Seeing this operator take shape is very exciting!

I do think not going with a default buffering strategy would be very unfortunate. Swift developers already carry two defaults in their mind: AsyncStream with an unbounded buffer and Combine’s share() with no buffer.

Since Combine is a proprietary framework, I believe consistency with the precedent set by the standard library is the best option. However, if we believe the proposed share's usage characteristics are different enough from AsyncStream to warrant a change, matching Combine’s behavior still seems preferable to me than no default at all.

3 Likes

Im not sure that logic really holds; we should have a safe option for all targets - @FranzBusch did convince me that having unbounded does cause problems with server side development (and some desktop development too).

Generally though, the progressive disclosure concept of having the simple thing be the right thing and then offer more complex things for more complex behaviors is a really useful guiding principle. After more discussion out of band, there is an option of a default value that would be suitable for both desktop, mobile AND server side development; .bounded(1). That policy would mean that iterating would only happen until the side has consumed the buffer to the given point; 1 element left. This is really close to the behavior that other systems; for example Combine’s operator for share is effectively a buffer of 1 element.

I think it would be reasonable to move forward with that as a middle ground. It would change the proposal from a default of .unbounded (meaning no limit to the backing buffer) to .bounded(1) meaning a maximum shared buffer of 1 element and ensuring the production is always within 1 element of the consumption.

12 Likes

I can see that the current state of the work is that this new share() API is gated to macOS 26.0 — is that the intention for the end state of the work, or will this back deploy to earlier releases? Most of us in the app development world are still targeting minimum OS releases that are 2+ years old.

4 Likes

That sounds good to me, a default of bounded(1) (or any other arbitrary number tbh) is by far better than unbounded which isn’t desirable to ā€œaccidentallyā€ end up with unbounded behaviors. It’s also following the ā€œcustomize only if you careā€ principle which is what is the case most of the time here for a lot of folks.

So I’m supportive of that default, and allowing customization when necessary :+1:

3 Likes

The mechanisms used have a minimum required version of macOS 15.0 et al. The '26 release is just a placeholder.

5 Likes

I think at this point I feel like we have reached a consensus around this pitch; in the next few days I will be merging this algorithm and moving on to the next most requested algorithm: ( MultiProducerSingleConsumerChannel from @FranzBusch.

Thanks to everyone who has participated and given the very useful feedback.

14 Likes

Maybe this is more of a feedback for the diff review… but do we know what kind of horizontal dependencies we expect between share and the rest of the package? Could this potentially ship in a "subpackage" target under AsyncAlgorithms so that a product engineer that needs only share could link just against that code and not pay a binary size and compile time penalty for code they might not need?

a question that occurred to me recently: since the proposed API intends to expose the share() method as an extension on AsyncSequence and erase the specific return type, what is the intended behavior if a client calls share() more than once? e.g.

let shared1 = base.share()
// do something with shared1
let shared2 = shared1.share()
// do something with shared2

the currently proposed implementation doesn't publicly expose the type that implements the sharing adapter logic, so it's not clear that client code could test for the condition 'is the sequence already shared?' if it cared to do that.

i was wondering about this from the perspective of 'could/should share() be idempotent?'. my intuition is that it should generally try to be[1], but also i find myself having a hard time thinking through how chaining multiple share() calls together would actually work vis-a-vis the current proposal.


  1. but the requested buffering behaviors may differ, so it may not be possible to avoid creating another sequence in those cases ā†©ļøŽ

2 Likes

This has come up before and we could leverage package traits to move each algorithm behind a trait and enable all by default. This would allow other packages to only pick the algorithms they need.

6 Likes