[Pitch] withLatestFrom

Hi everyone,

This post is to gather feedback and ideas about an operator equivalent to withLatestFrom that we can usually find in the reactive world.

The first point would be to decide if this is relevant from an AsyncSequence paradigm and if we want to go down that road in this repo.

For reference I have already implemented a draft version here: https://github.com/twittemb/swift-async-algorithms/blob/withLatestFrom/Sources/AsyncAlgorithms/AsyncWithLatestFromSequence.swift
with the related unit tests: https://github.com/twittemb/swift-async-algorithms/blob/withLatestFrom/Tests/AsyncAlgorithmsTests/TestWithLatestFrom.swift

The goal of such an operator is to combine 2 AsyncSequences that we can call Base and Other. The characteristics of the resulting AsyncSequence would be:

  • the output is a tuple (Base.Element, Other.Element)
  • it finishes when Base or Other finishes
  • it rethrows when Base or Other throws (if Base and Other are non throwing then the sequence is non throwing)
  • when a next Base.Element is captured, then the last known Other.Element is gathered and forms the awaited next tuple.
  • when a next Base.Element is captured, and there is not yet an Other.Element, then we wait for Other to produce its first element.

To be able to know the last element from Other at any time, It means that Other is being iterated over independently from Base in its dedicated Task.

Let's take a simplified example from the unit tests to better understand the expected outputs:

var base = GatedSequence([1, 2, 3]) // Sequence that outputs an element only when freed by us
let other = AsyncChannel<String>()

let sequence = base.withLatestFrom(other)

let validator = Validator<(Int, String)>()
validator.test(sequence) { iterator in
  let pastEnd = await iterator.next()
  XCTAssertNil(pastEnd)
  finished.fulfill()
}

var value = await validator.validate()
XCTAssertEqual(value, [])

base.advance() // we free the value 1 in Base
await other.send("a") // we send the value "a" in Other

value = await validator.validate() // we get the next value from sequence
XCTAssertEqual(value, [(1, "a")]) // at this point it behaves like `zip`

base.advance() // we free the value 2 in Base
value = await validator.validate() // get the next value from sequence
XCTAssertEqual(value, [(1, "a"), (2, "a")]) // diverges from `zip` because we don't wait a new value in Other, the last known one is "a"

await other.send("b")
await other.send("c") // we send several values in Other
value = validator.current
XCTAssertEqual(value, [(1, "a"), (2, "a")])

base.advance() // we free the value 3 in Base
value = await validator.validate() // get the next value from sequence
XCTAssertEqual(value, [(1, "a"), (2, "a"), (3, "c")]) // the last known value from Other is now "c"

await other.finish()

wait(for: [finished], timeout: 1.0)
value = validator.current
XCTAssertEqual(value, [(1, "a"), (2, "a"), (3, "c")])

This kind of operator works really well when Other is an AsyncChannel because send() waits for the value to be consumed to un-suspend. It makes the Other's current value predictable although it is iterated over in its dedicated Task.

On the other hand, when Other is a more traditional AsyncSequence, as it is iterated in a Task, we have no control on the pace at which the iteration happens. The Other's current value might not YET be the one we are thinking it will be.

Do you think this is something we could need in the library? in 1.0.0?
Can you think of any edge cases that would make such an operator non viable?

Let's iterate and gather feedback together :blush:.

Thanks.

5 Likes

Can you highlight what the differences with combineLatest will be ?

At first sight, they look very similar, so detailing how they differ may help to understand their respective use cases.

2 Likes

Those operators are indeed close. The big difference is that withLatestFrom won't produce any tuple when the Other sequence produces a value. withLatestFrom is driven by the Base sequence pace.
Whereas combineLatest will produce a tuple whenever one of the two sequence produces a value.

combineLatest is driven equally by Base and Other.

with let seq = base.withLatestFrom(other)

base : |-----1-------2-------------3------
other: |-a---|-------|------b--c---|------
             |       |             |
seq  : |----(1, a)--(2, a)--------(3, c)--

with let seq = combineLatest(base, other)

base : |-----1-------2-------------------------3------
other: |-a---|-------|----------b--------c-----|------
             |       |          |        |     |
seq  : |----(1, a)--(2, a)--(2, b)---(2, c)---(3, c)--

I hope this helps.

4 Likes

I've used this kind of operator under Combine as a means to use a signal generated from one source to "play out" values from another, imposing it's timing on the original set. The specific thing I was doing was taking an array of values and providing the values one at a time, with the timing being specified by the upstream signal (the base signal in the above example). In my case, I was using a Combine Timer() publisher to have that occur on a mostly regular basis (every N milliseconds kind of thing).

I don't know if there's a better way to do that kind of thing with async-stream today, but for the use case of replaying a sequence of values with specific timings controlled by an async sequence, I'd find it useful.

2 Likes

If you want to get every value from a second stream, you should probably use the zip operator, which emit a value every time both streams emit a new one.

With withLatestFrom, all your original values will be lost because this operator drop value from the secondary stream when a new one is emitted and no new value are still arrived on the base one.

Thank you.

Just in case people prefer the table found in the guides to understand what an operator do, here is a table explaining withLatestFrom

Timestamp appleFeed nasdaqFeed combined output
11:40 AM 173.91
12:25 AM 14236.78 AAPL: 173.91 NASDAQ: 14236.78
12:40 AM 14218.34
12:45 AM 14224.85
1:15 PM 173.00 AAPL: 173.00 NASDAQ: 14224.85
2 Likes

I think one way to look at the proposed operator is that it's used to sample other at the rate of base.

I think there's something to that name (sample), even if the name withLatestFrom is popular from some reactive libraries. (For example, flip the order of arguments and call it other.sampled(atRateOf: base) or something like that.)

8 Likes

I've been thinking about the names from traditional Rx (especially combineLatest). The question posed here about the difference between withLatestFrom and combineLatest is apt. Even if we can explain how they are different, it's clear that the names aren't really helping. They all feel like they are from different naming conventions. with as a prefix in Swift, for example, almost always comes with a closure argument.

Is there some way we could help people understand the differences between zip, combineLatest, withLatestFrom by using a set of names that feel like they are part of a closely related group of algorithms? Is withLatestFrom a boolean argument to combineLatest that alters its behavior?

7 Likes

I'm not sure about boolean flags. In general, I think it's best to have different functions with different names. I think it's easier to explain them and present specific examples this way. Maybe a reference from one function to another function works best. Something like: "If your use case is this other one, this is the function that you're looking for".

1 Like

help people understand the differences between zip , combineLatest , withLatestFrom

I've been wrestling with this too, and agree the traditional Rx vocabulary falls short here.

I see all of these methods as answers to the general question "how do I receive outputs from n sequences at the same time?", and the differences lie in their respective answers to "when do I receive a new set of outputs?":

  • zip: once all sequences have emitted a new value
  • combineLatest: once any sequence has emitted a new value
  • withLatestFrom: once self has emitted a new value

I do imagine a few challenges in trying to reconcile these:

  • zip's name and semantics benefit from its established synchronous parallel—but combineLatest and withLatestFrom require the async-specific axis of time
  • Swift's naming conventions seemingly apply asymmetrically to these algorithms, as the former two have no "obvious self" (in API Design Guidelines terms), while the third clearly does

That said, these algorithms are evidently closely related, and I think some degree of symmetry in their names and shapes would be a win for teachability.

1 Like

On the understanding they only differ by time considerations, a possible (probably bad) naming suggestion:
zip -> zipDiscretely
combineLatest -> zipContinuously
withLatestFrom -> zip(sampling:atRateOf:)

The reasoning of my opinion:
zip / zipDiscretely considers each sequence to be of discrete events, which it emits when the tuple event is complete.
combineLatest / zipContinuously considers each sequence to be of leading edges of continuous events and emits leading edges of the tuple event.
withLatestFrom / zip(sampling:atRateOf:) considers one as discrete events on which it is sampling the continuous events the other is the leading edges of, as @pyrtsa noted above.

1 Like

I like the idea of keeping zip as the main idea since:

  • it is a terminology already used in the standard library
  • overall it is already known from the dev community
  • it matches the 3 operators since they all intend to aggregate values from 2 sequences but at different pace

I can perhaps bring food for though with the notion of symmetry.

  • zipSymmetric for classical zip: zipSymmetric(left: A, right: B). It implies that both parts are constrained by the same pace in terms of forming the output tuples.
  • zipAsymmetric for both combineLatest and withLatestFrom. it implies that parts can freely advance to form the output tuples.

We could use an enum parameter to distinguish the applied strategy:

  • zipAsymmetric(left: A, right: B, elementWhen: .newFromLeftOrRight) for combineLatest
  • zipAsymmetric(left: A, right: B, elementWhen: .newFromLeft) for withLatestFrom.

The enum name would be to refine though :slight_smile:.

3 Likes

I kinda like this idea, but then, think it might be better to just call it zip(lhs: A, rhs: B, with samplingStrategy: SamplingStrategy) and use the strategies .newOutputFromBoth, .newOutputFromLeftOrRight, .newOutputFromLeft and while we're at it .newOutputFromRight.

1 Like

Problem with this approach is that it doesn't scale. I imagine it must be rare, but we could also consider zips with more than two async sequences.

Yep I was about to post the same idea :relaxed:.

we could play with ‘AND’ / ‘OR’ in the enum values:

  • newFromLeftAndRight for zip
  • newFromLeftOrRight for combineLatest
  • newFromLeft or newFromRight for withLatestFrom

[EDIT]: does not work well when 3 params though

Thinking about it this way, using variadic generics, I see the following signature:

zip<T...>(sequences: T..., with strategy: Strategy)
zip(a, b, c, with: .newOutputFromAll)
zip(a, b, c, with: .newOutputFromAny)

I'm not sure it is possible, but maybe something like this would be nice?

zip(a, b, c, with: .newOutputFrom(a))

Otherwise, only thing I could think is to make "with latest from" a function on the reference async sequence, but then, the zip name wouldn't work that well.

a.zip(b, c)

The strategy is lost here, so we would need another name.

a.something(b, c)

Couldn't think of what that something should be.

1 Like

Just thought of something else:

zip(a, b, c, outputtingWhen: .newOutputFromAll)
zip(a, b, c, outputtingWhen: .newOutputFromAny)
zip(a, b, c, outputtingWhenNewOutputFrom: d)

Mixing enum cases and function labels might be confusing, though.

An alternate naming:

zip(a, b, elementOn: .newElementFromAll)
zip(a, b, elementOn: .newElementFromAny)
zip(b, elementOn: .newElementFrom(a))

zip(a, b, c, elementOn: .newElementFromAll)
zip(a, b, c, elementOn: .newElementFromAny)
zip(b, c, elementOn: .newElementFrom(a))
1 Like

You're right! I was using Output because I had Combine's Publisher in my mind. Element is the associated type for AsyncSequence, right? So it makes more sense to use element instead of output.

Did you check if we can do that? Is that possible?