[Pitch] unzip

Hi everyone

The idea of sharing elements from an AsyncSequence between several consumers has been talked here and there in the forum.

The idea comes from what we know as a sharing operator in the reactive world (Combine Share). It relies on the concept of multicasting values from an upstream Publisher to any numbers of Subscribers using a Subject (because Subject shares, by design, its values across its subscribers) .

The share() operator is built on top of three mechanisms: a multicast operator, a Subject and an implicit autoconnect.

@Philippe_Hausler suggested that we should try to find a way to accomplish a sharing strategy without the complexity of the Combine implementation.

The connect mechanism allows to wait for all the subscribers to be ready to process values before starting the subscription, so nobody misses a value.

One way of not being forced to explicitly connect to the "shared" AsyncSequence would be to know in advance the number of client loops that would be iterating over the AsyncSequence. With that information we would allow to request the next element from the "shared" AsyncSequence only when all the clients have themselves requested the next element. The downside of this strategy is that there is a finite numbers of clients. The AsyncSequence is not "shared" per se, as we understand it from the Combine implementation.

If we go down that road, then maybe we should not use the "share" terminology.

Perhaps something like unzip would makes more sense.

  • zip: we combine async sequences in a single async sequence where elements are a tuple of all the latest elements from the parts
  • unzip: we uncombine a root AsyncSequence into children AsyncSequence where elements are the latest elements from the root AsyncSequence.
let sequence = [1, 2, 3].async
let (seq1, seq2) = sequence.unzip()

Task {
  for await element in seq1 {
    print("Task1: \(element)")
  }
}

Task {
  for await element in seq2 {
    print("Task2: \(element)")
  }
}

// will print:
Task1: 1
Task2: 1
Task2: 2
Task1: 2
Task1: 3
Task2: 3

Here is a list of questions/considerations that came to my mind while trying to implement a draft version / tests:

  • If waiting for all the clients to request a next element allows to get rid of the connect mechanism, should we do that for the first iteration only or for every iterations? It we do it for the first one only, then we perhaps need a buffering mechanism, if we do it for every iterations then the release of the next element will be regulated by the pace of the slowest client.
  • Should we wait for all the clients to request the next element to then request the next element from the root AsyncSequence or should we optimise this operation by requesting the next element from the root AsyncSequence as soon as the first client has requested it and releasing the element to every clients only when the last one has made its request.
  • What should happen when one of the client loops is cancelled? should we finish all the client iterations? should we continue with the rest of the clients?

There are other considerations to take into account for sure, and it is the goal of this post to gather them so we can refine the need for such an operator.

Thanks for reading. Don't hesitate to play with the draft implementation.

5 Likes

That is a unique approach on the implementation.

There are a minor details that need to be worked out -

  • linking Foundation for UUID is probably a no-go; I wonder - do we really need a globally unique thing? or just unique between the two sides? Would two ints work? If so could it be devolved even further to just a tuple instead of a dictionary?
  • calling it unzip is interesting but I would expect unzip to do the opposite of zip; zip takes two AsyncSequences and forms a singular AsyncSequence with a tuple of the elements of the bases. So I would expect unzip to take a singular AsyncSequence of tuples (2 members) and form two AsyncSequences. Likewise Id expect that it could take an AsyncSequence of tuples (3 membersP and form three AsyncSequences. i.e. zip(a, b).unzip() should return practically a tuple (a, b)
  • This definitely needs to be tested heavily for its thread safety - I wasn't able to follow all of it with a casual read but it looks kinda tricksky.

The unzip characteristic actually feels like you have captured using the language to make this feel a bit more at home.

2 Likes

One interesting fallout here btw is that if we were to make the symmetrical version it might actually mean that zip+map and map+unzip would be real common. Almost common enough to make them their own things. (Other frameworks do this too...)

@Philippe_Hausler Indeed, the implementation was a try to help me think about the use cases and the pitfalls. It cannot be considered as a basis for a proposal at this stage.

I agree with you about the naming, unzip should be the symmetrical operation to zip and should only apply to tuples.

I've tried another implementation that I've called split (could be divide?, share?, clone? ...). It relies on AsyncChannel to handle the producer/consumer waiting mechanism: AsyncSplit2Sequence. This implementation is not complete and surely does not handle edge cases and cancellation strategies. Some tests

I've also provided an implementation of an hypothetical unzip operator, which is basically a map over a split:

public extension AsyncSequence {
  func unzip<A, B>() -> (AsyncMapSequence<AsyncSplit2Sequence<Self>, A>, AsyncMapSequence<AsyncSplit2Sequence<Self>, B>) where Element == (A, B) {
    let (seq1, seq2) = self.split()
    return (seq1.map { $0.0 }, seq2.map { $0.1 })
  }
}

I'm trying to not focus too much on the implementation, instead I hope this thread can gather as much feedback as needed to shape one operator or a family of operators that would allow to share the elements of an AsyncSequence.

I can sump up the questions I have so far:

  • In order to avoid the "connect" mechanism, is it a good idea to have a finite number of "child" iterations that we can wait for to ask the base sequence to produce the next element?

  • should we always await for every "child" iterations to request a next element to ask the base sequence to produce the next element or only the first time? Should we consider a "buffered" version of the split operator?

  • what should happen when a "child" iteration is cancelled regarding the constraint of waiting for every "child" to request a next element?

Connect infers auto connect, which to be honest always seemed like a thing that just hung off the pipeline for more of tradition than any use case. I'd like to avoid that type of situation.

Having uncontrolled buffers in a program is basically tantamount to a leak. If someone is not careful they can buffer up an unbounded number of elements into that buffer and experience a memory pressure event; which in the end for the user of the app using that would be less than ideal.

This also simplifies the concept of cancel too; if you cancel a child then it checks to see if it was the last child to be cancelled. If so then in addition to the normal "stop the state of iteration and release resources" it also stops the parent's state of iteration and releases resources.

let (a, b) = input.unzip()
let t1 = Task {
  for await item in a { }
}
let t2 = Task {
  for await item in b { }
}
// after a bit a and b are active
// wait a bit more
t1.cancel()
// a is terminal and finishes task 1 but b is still iterating
// wait a bit more
t2.cancel()
// b is terminal and finishes task1, and then the input is cancelled.

Thanks for the insights @Philippe_Hausler

I’ve came up with a better (IMO) implementation using AsyncChannel.

Source AsyncSplit2Sequence
Tests TestSplit

Source AsyncUnzip2Sequence
Tests (incomplete) TestUnzip

Do you think we should move forward with this implementation / should write a proposal ?

Thanks.

Here is the new branch for the implementation:

Tests:

1 Like