Hi everyone
The idea of sharing elements from an AsyncSequence
between several consumers has been talked here and there in the forum.
The idea comes from what we know as a sharing operator in the reactive world (Combine Share). It relies on the concept of multicasting values from an upstream Publisher
to any numbers of Subscribers
using a Subject
(because Subject
shares, by design, its values across its subscribers) .
The share()
operator is built on top of three mechanisms: a multicast
operator, a Subject
and an implicit autoconnect
.
@Philippe_Hausler suggested that we should try to find a way to accomplish a sharing strategy without the complexity of the Combine
implementation.
The connect
mechanism allows to wait for all the subscribers to be ready to process values before starting the subscription, so nobody misses a value.
One way of not being forced to explicitly connect
to the "shared" AsyncSequence
would be to know in advance the number of client loops that would be iterating over the AsyncSequence
. With that information we would allow to request the next element from the "shared" AsyncSequence
only when all the clients have themselves requested the next element. The downside of this strategy is that there is a finite numbers of clients. The AsyncSequence
is not "shared" per se, as we understand it from the Combine
implementation.
If we go down that road, then maybe we should not use the "share" terminology.
Perhaps something like unzip
would makes more sense.
-
zip
: we combine async sequences in a single async sequence where elements are a tuple of all the latest elements from the parts -
unzip
: we uncombine a rootAsyncSequence
into childrenAsyncSequence
where elements are the latest elements from the rootAsyncSequence
.
let sequence = [1, 2, 3].async
let (seq1, seq2) = sequence.unzip()
Task {
for await element in seq1 {
print("Task1: \(element)")
}
}
Task {
for await element in seq2 {
print("Task2: \(element)")
}
}
// will print:
Task1: 1
Task2: 1
Task2: 2
Task1: 2
Task1: 3
Task2: 3
Here is a list of questions/considerations that came to my mind while trying to implement a draft version / tests:
- If waiting for all the clients to request a next element allows to get rid of the
connect
mechanism, should we do that for the first iteration only or for every iterations? It we do it for the first one only, then we perhaps need a buffering mechanism, if we do it for every iterations then the release of the next element will be regulated by the pace of the slowest client. - Should we wait for all the clients to request the next element to then request the next element from the root
AsyncSequence
or should we optimise this operation by requesting the next element from the rootAsyncSequence
as soon as the first client has requested it and releasing the element to every clients only when the last one has made its request. - What should happen when one of the client loops is cancelled? should we finish all the client iterations? should we continue with the rest of the clients?
There are other considerations to take into account for sure, and it is the goal of this post to gather them so we can refine the need for such an operator.
Thanks for reading. Don't hesitate to play with the draft implementation.