Swift Async Algorithms Proposal: Joined

Joined

Introduction

The joined() and joined(separator:) algorithms on AsyncSequences provide APIs to concatenate an AsyncSequence of AsyncSequences.

extension AsyncSequence where Element: AsyncSequence {
  public func joined() -> AsyncJoinedSequence<Self>
}

extension AsyncSequence where Element: AsyncSequence {
  public func joined<Separator: AsyncSequence>(separator: Separator) -> AsyncJoinedBySeparatorSequence<Self, Separator>
}

Detailed Design

These algorithms iterate over the elements of each AsyncSequence one bye one, i.e. only after the iteration of one AsyncSequence has finished the next one will be started.

 let appleFeed = URL("http://www.example.com/ticker?symbol=AAPL").lines
 let nasdaqFeed = URL("http://www.example.com/ticker?symbol=^IXIC").lines

 for try await line in [appleFeed, nasdaqFeed].async.joined() {
   print("\(line)")
 }

Given some sample inputs the following combined events can be expected.

Timestamp appleFeed nasdaqFeed output
11:40 AM 173.91 173.91
12:25 AM 14236.78
12:40 AM 14218.34
1:15 PM 173.00 173.00
1:15 PM 14236.78
1:15 PM 14218.34

The joined() and joined(separator:) methods are available on AsyncSequences with elements that are AsyncSequences themselves and produce either an AsyncJoinedSequence or an AsyncJoinedBySeparatorSequence.

As soon as an inner AsyncSequence returns nil the algorithm continues with iterating the next inner AsyncSequence.

The throwing behaviour of AsyncJoinedSequence and AsyncJoinedBySeparatorSequence is that if any of the inner AsyncSequences throws, then the composed sequence throws on its iteration.

Naming

The naming follows to current method naming of the standard library's joined method.
Prior art in the reactive community often names this method concat; however, we think that an alignment with the current method on Sequence is better.

Comparison with other libraries

ReactiveX ReactiveX has an API definition of Concat as a top level function for concatenating Observables.

Combine Combine has an API definition of append which offers similar functionality but limited to concatenating two individual Publishers.

8 Likes

Looks good to me. Thanks for bringing this proposal to the community @FranzBusch!

2 Likes

From the example, it looks like the results of the pending sequences are held until the current sequence completes and then delivered immediately. i.e. the Nasdaq feed is emitting values that are held until the Apple feed completes and then immediately delivered.

Is that the case?

I think I would expect pending sequences to only be started once the current sequence completes.

AsyncJoinedSequence only calls an inner sequence's makeAsyncIterator after the prior inner sequence's iterator has been exhausted. This matches your expectation.

For the appleFeed/nasdaqFeed example to make sense, the feeds must be “cold” (to use ReactiveX terminology).

1 Like

Seems like the example needs to be updated then, as it certainly looks like the other sequences are hot but just caching values until the earlier sequences finish.

3 Likes

That’s a great question. The AsyncSequences in this example are buffering their values until a Subscriber is consuming them.

You are right that this is not obvious and I will update the example to make this more obvious.

This conversation shows me that a more detailed guideline what AsyncSequences can do would be helpful.

1 Like

Got it. So the sequences provided to Join are buffering async sequences rather than Join performing any buffering itself.

Sounds great to me.

And I’m guessing the ticker sequences can be made into buffering sequences by wrapping in AsyncStream?

If so, that’s really useful to know, I had no idea we could use AsyncStream to make hot/buffering sequences like that.

Right the sequences are assumed to buffer. Join is simply a Processor and nothing more.

AsyncStream is an AsyncSequence „source“ that provides various buffering strategies.

1 Like