Swift Async Algorithms Proposal: CombineLatest

Combine Latest

Introduction

Similar to the zip algorithm there is a need to combine the latest values from multiple input asynchronous sequences. Since AsyncSequence augments the concept of sequence with the characteristic of time it means that the composition of elements may not just be pairwise emissions but instead be temporal composition. This means that it is useful to emit a new tuple when a value is produced. The combineLatest algorithm provides precicely that.

Detailed Design

This algorithm combines the latest values produced from two or more asynchronous sequences into an asynchronous sequence of tuples.

let appleFeed = URL("http://www.example.com/ticker?symbol=AAPL").lines
let nasdaqFeed = URL("http://www.example.com/ticker?symbol=^IXIC").lines

for try await (apple, nasdaq) in combineLatest(appleFeed, nasdaqFeed) {
  print("AAPL: \(apple) NASDAQ: \(nasdaq)")
}

Given some sample inputs the following combined events can be expected.

Timestamp appleFeed nasdaqFeed combined output
11:40 AM 173.91
12:25 AM 14236.78 AAPL: 173.91 NASDAQ: 14236.78
12:40 AM 14218.34 AAPL: 173.91 NASDAQ: 14218.34
1:15 PM 173.00 AAPL: 173.00 NASDAQ: 14218.34

This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted, these will be implemented in terms of two- and three-base sequence cases.

public func combineLatest<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncCombineLatest2Sequence<Base1, Base2>

public func combineLatest<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncCombineLatest3Sequence<Base1, Base2, Base3>

public struct AsyncCombineLatest2Sequence<Base1: AsyncSequence, Base2: AsyncSequence>: Sendable
  where
    Base1: Sendable, Base2: Sendable,
    Base1.Element: Sendable, Base2.Element: Sendable,
    Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable {
  public typealias Element = (Base1.Element, Base2.Element)

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

public struct AsyncCombineLatest3Sequence<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>: Sendable
  where
    Base1: Sendable, Base2: Sendable, Base3: Sendable
    Base1.Element: Sendable, Base2.Element: Sendable, Base3.Element: Sendable
    Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base3.AsyncIterator: Sendable {
  public typealias Element = (Base1.Element, Base2.Element, Base3.Element)

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

The combineLatest(_:...) function takes two or more asynchronous sequences as arguments and produces an AsyncCombineLatestSequence which is an asynchronous sequence.

Since the bases comprising the AsyncCombineLatestSequence must be iterated concurrently to produce the latest value, those sequences must be able to be sent to child tasks. This means that a prerequisite of the bases must be that the base asynchronous sequences, their iterators, and the elements they produce must all be Sendable.

If any of the bases terminate before the first element is produced, then the AsyncCombineLatestSequence iteration can never be satisfied. So, if a base's iterator returns nil at the first iteration, then the AsyncCombineLatestSequence iterator immediately returns nil to signify a terminal state. In this particular case, any outstanding iteration of other bases will be cancelled. After the first element is produced ,this behavior is different since the latest values can still be satisfied by at least one base. This means that beyond the construction of the first tuple comprised of the returned elements of the bases, the terminal state of the AsyncCombineLatestSequence iteration will only be reached when all of the base iterations reach a terminal state.

The throwing behavior of AsyncCombineLatestSequence is that if any of the bases throw, then the composed asynchronous sequence throws on its iteration. If at any point (within the first iteration or afterwards), an error is thrown by any base, the other iterations are cancelled and the thrown error is immediately thrown to the consuming iteration.

Naming

Since the inherent behavior of combineLatest(_:...) combines the latest values from multiple streams into a tuple the naming is intended to be quite literal. There are precedent terms of art in other frameworks and libraries (listed in the comparison section). Other naming takes the form of "withLatestFrom". This was disregarded since the "with" prefix is often most associated with the passing of a closure and some sort of contextual concept; withUnsafePointer or withUnsafeContinuation are prime examples.

Comparison with other libraries

Combine latest often appears in libraries developed for processing events over time since the event ordering of a concept of "latest" only occurs when asynchrony is involved.

ReactiveX ReactiveX has an API definition of CombineLatest as a top level function for combining Observables.

Combine Combine has an API definition of combineLatest has an operator style method for combining Publishers.

6 Likes

Minor question about Sendable constraints: does Base1: Sendable not already imply Base1.Element: Sendable and Base1.AsyncIterator: Sendable?

I wish that were the case however it does not seem to happen like that. It would be considerably less typing. But for example you could define an asynchronous sequence as Sendable but then define its AsyncIterator as some external type that isn't marked as Sendable. Granted I feel that the constraints should be cascading; it just seems like an expressibility hole generally for all protocols with associated types.

I think my question is more of a "should this actually be allowed?" And the answer to that may very well be "yes"!

I cannot think of a situation where that should be allowed. If I had my druthers I would claim that all AsyncIterators of AsyncSequences should share the Sendable annotations likewise with their elements.

That being said, there is merit for some AsyncSequence types (and consequently their iterators and elements) to NOT be Sendable.

1 Like

Looking good.

One question: combineLatest(_:_:) and combineLatest(_:_:_:) appear to be free functions. Is there a reason we're choosing this format over member functions on AsyncSequence?

One advantage of forbidding Sendable on AsyncIterators is that it enables you to ensure that there cannot possibly be two concurrent calls to next. That enables a wide range of simplifications of the implementation. In fact, my inclination is to say that non-Sendable AsyncIterators is really quite helpful in many cases, as there's often no good reason to require them to be passed from one Task to the next.

2 Likes

just like zip these have no preference on which side is the "primary". Making them member functions would infer that type of behavior. combineLatest falls into the same category family as zip; so they should be the same free-floating function concept.

2 Likes

While this is true that no two tasks can call next on the iterator, as long as the AsyncSequence is Sendable you can still have two iterators that are trying to call next. Since all current implementations of an AsyncIterator are basically reaching out to the underlying AsyncSequence this doesn't really give you anything.

I am still thinking of a case where a difference in the Sendable annotation of these two types could be used.

Interesting – and merge(_:_:), too.

I wasn't sure at first, but upon reflection I think it makes sense. I'm always hesitant to use free functions, but seeing as they are namespaced behind the module name – I can't think of a good reason not to.

I wonder if this form would even make sense for a theoretical withLatestFrom, in the form of combine(_:onLatestFrom:), or combine(_:onElementFrom:) – even though the arguments are non-commutative perhaps the argument label can be used to make that clear.

Reactive style frameworks are easier to understand for me when I think about them as pipelines. I think by grouping all the manifold points – points in the pipeline where multiple sequences are joined - regardless whether the operator has a 'primary', element, may make it easier for new comers.

This way, it's easy to reason that 'straight-line' pipeline sections can be chained, whereas manifold points will need to be constructed explicitly.

1 Like

While this is often the case today, I don't think we want to excessively tightly tie ourselves to that constraint. It's entirely possible to want to have per-Task structure, such as non-Sendable transformer types, in an AsyncIterator. Simply forbidding this seems unnecessary.

1 Like

I agree, forbidding it would be unnecessarily restrictive. It could definitely allow a pattern where you share an AsyncSequence across tasks ant not allow the iterators to be Sendable.