Integrate some work from AsyncExtensions?

Hi everyone

Really really nice repo/initiative. I’ve myself open sourced a similar repo a few weeks ago GitHub - sideeffect-io/AsyncExtensions: AsyncExtensions aims to mimic Swift Combine operators for async sequences.

I was wondering if it would make sense to integrate (after reworking) some of the operators in the newly Apple supported Async Algo repo? I guess you already plan to work on operators like ‘withLatestFrom’ or ‘switchLatest’ and you might already have a design in mind for that. I’d be happy to help and contribute.

Anyway thanks a lot for your work.



Hi @Philippe_Hausler

After trying to understand the implementation of the merge operator, wouldn't be simpler to have some form of regulation of the tasks handling each base iterators and merging their output using a channel.

A regulation could be like AsyncExtensions/ConcurrentAccessRegulator.swift at main · sideeffect-io/AsyncExtensions · GitHub where you can request the next value from an iterator only if the iterator is available (not already being waiting for the next value, using a gate to let the caller pass or not). And once the value is emitted then use a channel to streamline all the values from all the iterators to the client loop.

Keeping an internal tuple of states (idle/pending/terminate), to submit a combination of existing and new tasks to is clever but not trivial to understand, and not that easy to extend to a form of variadic Base parameters.

other question: Why not using an Actor to manage the concurrent access to TaskSelectState ?

Thanks a lot, and keep the good work.

I think one of the major premises of this package is to centralize stuff that is useful. Per switchToLatest; yea I had a draft implementation for that but it was really only a draft - it is missing documentation, a guide, tests, and a proposal; so in truth there is a lot to still do. Plus I think we have a backlog of reviews of each algorithm to get through - in my opinion (which I think is shared with a lot of folks in the swift community) even though the evolution pitches can be a bit involved they bring out some really fantastic ideas and measure the excitement/usefulness of APIs.

Per merge, part of the back-story there that maybe needs to be captured in the guide a bit more explicitly is that we wanted the multiple-input algorithms to let each side of them run free when a demand for the next value is present. This means they should all run their iteration in parallel. I agree that the combination becomes a bit unwieldy as N increases. I was working on a simplification which dramatically reduced that explosion but sadly it ended up being unacceptable performance. So what you see with some of them is where we are trying to eke out really highly performant algorithms and sometimes that means that the code size grows.

Per the TaskSelectState that was just the approach I happened to take - im sure there are better ways to accomplish that - I would really like to reduce the performance impact of considerably. The only way I have come up with so far that meets it's behavioral requirements is actually to sink it into the _Concurrency library and implement it mostly in the runtime itself. For obvious reasons that is not reasonable to do in a package. If you have an idea on how to improve this I strongly encourage sharing that; I would be more than happy to iterate on it together.

Thanks a lot for your answer @Philippe_Hausler.

I you take a look at AsyncExtensions/ConcurrentAccessRegulator.swift at main · sideeffect-io/AsyncExtensions · GitHub you can see that it does what you suggest by allowing each iteration happening in parallel and does not increase the size of the code when N gets bigger. It is also used on operators like multicast since there are concurrent client loops that request a next value. The tradeoff is that is implies to use a Channel to merge all the values into a single output.

Here is the code of merge and multicast for reference. Maybe that can help bring new ideas. I have not tested those implementations regarding performance though!

I'll try also to come up with an implementation of multicast that respects the pattern you've implemented for now.

Would love to see some of the AsyncExtensions work make it over to this package.

Especially the ability to have multiple for await in loops running on an async sequence at the same time.

I guess the first step would be to implement some kind of BroadcastChannel so we can reuse it in a ‘multicast’ like operator.

@Philippe_Hausler would it make sense to make a proposal about integrating a BroadcastChannel equivalent in the repo ? And would it make sense to rely on AsyncStream to handle the buffering of values ?


Hi @Philippe_Hausler

I see there are several discussions going on about having a way to broadcast values to several clients.
If you can confirm it would make sense to have that in this repo I guess it would worth bootstrapping a topic to group all the requirements (perhaps leading to different kinds of implementations)

  • should it be buffered?
  • should the send function be async, waiting for all clients to have consumed the value?
  • should the send function be async, waiting for the buffer to have free space when currently full?

What do you think?

CC @jmjauer @BrentM

I think those are all good questions; there is another on my mind. share in Combine was always a point of learning difficulty. Getting to that point was somewhat a struggle for those who think with an imperative mainframe. Not everyone has a modality of thinking that is functional. Share in my opinion was perhaps harder to grasp for some folks than even the initial hurdle between reference and value types (conceptually they have some overlaps). The question is: can we do better?

Is this problem something we can build tooling for that defines away that difficulty? This is why I favor the concept of the awaiting all clients to consume a value; more as an algorithm on an existing async sequence than say a channel-like thing. Because then we could perhaps inference the behavior via the type system.

For example:

extension AsyncSequence {
  func goodNameGoesHere() -> (AsyncSideForABetterName<Self>, AsyncSideForABetterName <Self>)

The advantage via using a tuple return means that all of the sides MUST be used. The language enforces the concept of using all of the sides. If one is not used; it emits a warning. Obviously this is not the only way to solve it. That concept of leveraging the language to advance this feels more along the right path for at least part of the problem space. Perhaps in time we may find that there are more than just two or three solutions that deserve distinct representations.

From a material standpoint; autoconnect is almost NEVER used besides timer and multicast with Combine from my experience. In those cases it feels like a vestigial organ serving only a ritualistic "this is what you do" type usage. Which feels to me as perhaps something we want to avoid. Share is also rather mysterious for folks, the placement isn't immediately obvious to newcomers to Combine. I would like to avoid that mysteriousness.

P.S. To me multicast, share and such all make great sense to me, but I want to make sure we add things for their ease of use and utility and not "just because Combine did it".


I see what you mean and I totally get that we have to somehow break our habits.

Indeed, having a kind of AsyncSequence that awaits that every client has consumed the next element to allow the next value to be requested would eliminate the need to deal with « connect » … but only in the case where you know in advance the numbers of « child » AsyncSequence you want to create, I guess this is why you return a tuple in your example? (It is somehow a kind of « pipe » operator).

We should also be able to create undefined number of « children », connected to the same parent AsyncSequence, but in that use case we cannot guarantee that some elements won’t be lost for the children created after elements have been consumed (we could perhaps use a buffer to replay those values).

To sum up, I guess that there are 2 topics here:
1 - sharing -> being able to produce children AsyncSequences connected to the same parent AsyncSequence. All the children should consume the next value so the parent can be requested again (could perhaps reuse the concept of AsyncChannel?). It can solve the « connect » issue when the number of children is known.
2 - bridging the imperative world with AsyncSequence by providing a « broadcastchannel like » tool with non suspendable send. If we ever want to share that tool between several clients, then we can use the operator described in (1).

Do you think it goes in the good direction @Philippe_Hausler ?

In regards to your second question.

I think that being able to await for a value to be completely consumed by all clients is definitely beneficial and a welcome addition.

But in most cases, the combine/rx send-and-forget behavior is generally good enough.

Do you think it would make sense to have a .send() and a .asyncSend() method on async sequences to handle both of these use cases?

or would something like: async let asyncSend = myAsyncStream.send("some value") be good enough for the situations where you don't care about waiting for all clients to consume the value?

As for buffers, I would love if we added a .buffer(size: Int) operator.

So rather than having an async stream similar to CurrentValueSubject, you could just add a buffer to any AsyncStream with the number of previous values you want to repeat when a new client is added:

let stream = MyAsyncStream<String>.buffer(size: 1)

func main() {

   for await value in stream {
       print("\(value)") // Prints "B"

Just throwing one thought out here fairly quickly:

Perhaps it'd be possible to have a fixed size buffer for back pressure and have async sends and model it on the LMAX disruptor (LMAX Disruptor: High performance alternative to bounded queues for exchanging data between concurrent threads) - this gives back pressure when the ringbuffer is full but allows for some nice performance characteristics (with e.g. batching effects when reading with little synchronisation overhead). Not considering the full set of configurations that the Disruptor supports, but specifically the producer -> multi consumers in this case. (more reference material here: Blogs And Articles · LMAX-Exchange/disruptor Wiki · GitHub)