Async Zip

Combined AsyncSequences

I've been trying to wrap my head around async sequences. While the concept is simple and I think I understand it well, I am specifically struggling with writing some methods that combine and transform sequences. Ideally, I'd like for these methods to support backpressure as well. As an example, I've been trying to implement zip.

Here's an implementation that uses async let to implement zip: AsyncZipped.swift. When enabling -Xfrontend -warn-concurrency -Xfrontend -enable-actor-data-race-checks, this warns about the iterators not being sendable. So I'm pretty sure my implementation is incorrect.

In my first attempt, I was only able to implement zip by storing off closures to the next function of the underlying iterators. While I did get it to compile, this also gives me a bunch of warnings related to Sendable. (And to be honest, I was surprised it worked at all).

I was wondering if anyone could weigh in whether this approach is futile, or whether it does make sense. My guess is that in order to write zip, these underlying streams have to be Sendable? (I'm saying this although I don't fully understand Sendable yet).

3 Likes

I have been looking into some of the more interesting algorithms such as zip. Your assertion about Sendable is correct for that family of algorithms; the underlying reason is that in order to process two things simultaneously you need to send that value across to the tasks running concurrently. This boils down to that operations as such need at least the iterator and the element to be Sendable.

I would guess that a zip of two async sequences would have the following signature (at least until we get variadic generics):

public func zip<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncZip2Sequence<Base1, Base2>
  where Base1: Sendable,
        Base2: Sendable,
        Base1.AsyncIterator: Sendable,
        Base2.AsyncIterator: Sendable,
        Base1.Element: Sendable,
        Base2.Element: Sendable

Technically the Base1: Sendable and Base2: Sendable is not needed but it seems reasonable to expect.

2 Likes

Thank you, that makes sense. I've added those to the first gist, and now it compiles without any warnings. The second variant (storing of the closure) won't work, right? Because once I add @Sendable to the properties storing the closure, I can't mutate the underlying iterator anymore.

In the first variant, I'm a bit worried about newAndNext, that feels wrong to me. Do you know if there's a way to write a cleaner version of that?

Per the newAndNext something that achieves the same thing is pretty much inevitable since you need to write back the iteration; since iteration can be either expressed as mutation or by held referential state.

So the outline of functionality I would expect from zip would be this:

  • The iterator invokes next concurrently on the iterators of 2 or more async sequences
  • When both calls to next return a value it is a tuple of the values pairwise.
  • If either iterators return nil from the call to next then nil is returned immediately and the others are cancelled.
  • If either iterators throws from the call to next, then the error is rethrown and the others are cancelled.

The first expectation is where it infers some sort of concurrent access; either structured or unstructured.
The second expectation determines the behavior of those iterations with respect to the output.
The third and fourth describe the terminal states and how they should handle cancellation.

The tricky bit that is remaining to be solved with your implementation is the rethrown error part. Where if you zip two sequences that can not fail then the composite of those two zipped together should not fail.

I have been able to do it with unstructured concurrency (but it gets complicated really quickly, bringing in locks and other gnarly bits). Your approach does offer some simplicity that avoids some of that by favoring the structured concurrency method. However I am not sure if that actually is fully supported yet since the async let syntax does not yet support rethrows by conformance. Task groups also do not yet support that (and I am not sure they really can without some severe signature modifications).

Thank you for clarifying @Philippe_Hausler! Isn't the next function marked as throws in the protocol? Wouldn't that mean that next could always potentially throw?

I've also been thinking about something like merge, where you take two sequences and combine them to always take the latest value. This one feels pretty tricky, and for some reason I have an inkling that it can't be written with structured concurrency (although I'm hoping I'm wrong). Have you looked into that at all?

1 Like

Per the throws: protocols can be satisfied either by a throwing function or a non throwing function when the protocol requires throws. With AsyncIteratorProtocol it can then either be throwing or not for a concrete adopter. The protocol is then also marked as @rethrows which makes that differential be part of the calculation when it comes to rethrowing. So I would expect zip, merge and other AsyncSequences that are composed by others to throw when their bases throw but not throw when the bases do not.

I have also looked into merge. It is definitely accompishable but it is not immediately trivial (similar to zip).

1 Like