Swift Async Algorithms Proposal: Merge

Merge

Introduction

In the category of combinations of asynchronous sequences there are a few different potential behaviors. This category all take two or more AsyncSequence types and produce one AsyncSequence. One fundamental behavior is taking all values produced by the inputs and resuming the iteration of the singular downstream AsyncSequence with those values. This shape is called merge.

Detailed Design

Merge takes two or more asynchronous sequences sharing the same element type and combines them into one singular asynchronous sequence of those elements.

let appleFeed = URL(string: "http://www.example.com/ticker?symbol=AAPL")!.lines.map { "AAPL: " + $0 }
let nasdaqFeed = URL(string:"http://www.example.com/ticker?symbol=^IXIC")!.lines.map { "^IXIC: " + $0 }

for try await ticker in merge(appleFeed, nasdaqFeed) {
  print(ticker)
}

Given some sample inputs the following merged events can be expected.

Timestamp appleFeed nasdaqFeed merged output
11:40 AM 173.91 AAPL: 173.91
12:25 AM 14236.78 ^IXIC: 14236.78
12:40 AM 14218.34 ^IXIC: 14218.34
1:15 PM 173.00 AAPL: 173.00

This function family and the associated family of return types are prime candidates for variadic generics. Until that proposal is accepted, these will be implemented in terms of two- and three-base sequence cases.

public func merge<Base1: AsyncSequence, Base2: AsyncSequence>(_ base1: Base1, _ base2: Base2) -> AsyncMerge2Sequence<Base1, Base2>

public func merge<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncMerge3Sequence<Base1, Base2, Base3>

public struct AsyncMerge2Sequence<Base1: AsyncSequence, Base2: AsyncSequence>: Sendable
  where
    Base1.Element == Base2.Element,
    Base1: Sendable, Base2: Sendable,
    Base1.Element: Sendable, Base2.Element: Sendable,
    Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable {
  public typealias Element = Base1.Element

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

public struct AsyncMerge3Sequence<Base1: AsyncSequence, Base2: AsyncSequence, Base3: AsyncSequence>: Sendable
  where
    Base1.Element == Base2.Element, Base1.Element == Base3.Element,
    Base1: Sendable, Base2: Sendable, Base3: Sendable
    Base1.Element: Sendable, Base2.Element: Sendable, Base3.Element: Sendable
    Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base3.AsyncIterator: Sendable {
  public typealias Element = Base1.Element

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

The merge(_:...) function takes two or more asynchronous sequences as arguments and produces an AsyncMergeSequence which is an asynchronous sequence.

Since the bases comprising the AsyncMergeSequence must be iterated concurrently to produce the latest value, those sequences must be able to be sent to child tasks. This means that a prerequisite of the bases must be that the base asynchronous sequences, their iterators, and the elements they produce must be Sendable.

When iterating a AsyncMergeSequence, the sequence terminates when all of the base asynchronous sequences terminate, since this means there is no potential for any further elements to be produced.

The throwing behavior of AsyncMergeSequence is that if any of the bases throw, then the composed asynchronous sequence throws on its iteration. If at any point an error is thrown by any base, the other iterations are cancelled and the thrown error is immediately thrown to the consuming iteration.

Naming

Since the inherent behavior of merge(_:...) merges values from multiple streams into a singular asynchronous sequence, the naming is intended to be quite literal. There are precedent terms of art in other frameworks and libraries (listed in the comparison section). Other naming takes the form of "withLatestFrom". This was disregarded since the "with" prefix is often most associated with the passing of a closure and some sort of contextual concept; withUnsafePointer or withUnsafeContinuation are prime examples.

Comparison with other libraries

ReactiveX ReactiveX has an API definition of Merge as a top level function for merging Observables.

Combine Combine has an API definition of merge(with:) as an operator style method for merging Publishers.

Effect on API resilience

@frozen and @inlinable

These types utilize rethrowing mechanisms that are awaiting an implementation in the compiler for supporting implementation based rethrows. So none of them are marked as frozen or marked as inlinable. This feature (discussed as rethrows(unsafe) or rethrows(SourceOfRethrowyness) has not yet been reviewed or implemented. The current implementation takes liberties with an internal protocol to accomplish this task. Future revisions will remove that protocol trick to replace it with proper rethrows semantics at the actual call site. The types are expected to be stable boundaries to prevent that workaround for the compilers yet to be supported rethrowing (or TaskGroup rethrowing) mechanisms. As soon as that feature is resolved; a more detailed investigation on performance impact of inlining and frozen should be done before 1.0.

Alternatives considered

It was considered to have merge be shaped as an extension method on AsyncSequence however that infers a "primary-ness" of one AsyncSequence over another. Since the behavior of this as a global function (which infers no preference to one side or another) it was decided that having symmetry between the asynchronous version and the synchronous version inferred the right connotations.

9 Likes

What's the behavior if more than one base sequence has an element ready? Will the first sequence always win, or will the combined sequence go round-robin, or something else?

The current behavior is that the first temporality wins, then it cycles through by modulo of the inputs (aka round robin).

2 Likes

One concern that was brought up privately to me;
Being that this is a top level function there is a distinct lack of discoverability per auto-completion. It requires the developer to know the function a-priori.

Which definitely this concern is quite true - the merge function belongs in the same group as zip; it is topologically a combinator of sorts; taking at least 2 inputs and combining them to 1 output. Additionally merge acts just as zip does in that it has no preference on which "side" produces a value or not; so ordering via appleFee.merge(nasdaqFeed) is perhaps inferencing the wrong thing - instead we want to connote that the two feeds are merged together without respect to order of how they are written.

This means that the function is at odds with the existing idioms set forth by top level functions like zip or min, and the discoverability of the function at a global scope. Unfortunately I don't see the discoverability problem as the distinct responsibility of the API itself. And more so the responsibility of the tooling associated with presenting the API.

So in short; I agree with the concerns but see no real way to keep the proper semantical spelling that keeps similarity to the family of functions it clearly belongs to.

1 Like

If I understand this answer, this isn't so great as during my akka-streams days we've seen Rx systems starve "other" streams when one merged with an "always ready" data-source, while akka streams performed more fair, going round robin if both sources are available AFAIR and this avoiding starving (or "never pulling from") the other streams being merged.

Tho maybe it is fine for the naive merge and we'd introduce better ones with time...?

Is the current behavior what Combine has and thus you'd want to mirror it exactly, or rather just because it was implemented like that now?

When iterating a AsyncMergeSequence , the sequence terminates when all of the base asynchronous sequences terminate, since this means there is no potential for any further elements to be produced.

What's the plan for an eagerly completing merge -- complete as soon as either of the upstreams have completed?

What's the plan for future evolution where some streams are more "prioritized" or other semantics, we'd introduce more named functions (I'm totally happy with that), or would you plan for some "strategy" enum to be passed? For reference other semantics we might need to offer:

  • mergePreferred • Akka Documentation
    • Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams are empty.
  • mergePrioritized • Akka Documentation
    • Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used with only two sources, the left source has a probability of (leftPriority) / (leftPriority + rightPriority) of being prioritized and similarly for the right source.
  • mergeSorted • Akka Documentation

etc :slight_smile:

Thanks for the info in advance, just want to make sure we future proof these function's shapes.

1 Like

Also, on a tangent (super happy to see all the work on sync algos!) - how would we name a 'conflation' (merge) strategy on a single async stream which allow for user-defined merging on an element level? It is one of the (for us) common use cases which is a bit unclear how it would fit in properly (as we discussed previously in the context of AsyncStream) for a good implementation. (can break out that discussion if not relevant for the proposal, but in case it affects naming approach I wanted to bring it up).

I think a priority based system is a potentially useful avenue to explore as an additional consideration - however I don't think they are mutually exclusive. It is definitely something to consider when expanding these APIs (same thing for zip, which has similar behavioral tweak potentials).

That almost sound like a map to me. Merge as proposed is a combinator of AsyncSequences - so perhaps you are talking about a split then map each side and then merge those splits back?

let (a, b) = sourceAsyncSequence.split()
for await c in merge(a.map(transformation), b.map(transformation)) {
}

which that could also be envisioned as:

let transformed = sourceAsyncSequence.chunked(byCount: 2).flatMap { $0.map(transformation).async }
for await c in transformed {
}

If I am understanding the request correctly.

I’d like to do the same “cleanup” for the tests I did for zip. Unfortunately I cannot do it now, is this ok if we do a PR afterwards ?