Consider the following code:
struct CombineLatestTests {
@Test
func `combineLatest subscriber receives all values`() async throws
{
// Given
let firstPublisher = CurrentValueSubject<Int, Never>(1)
let secondPublisher = CurrentValueSubject<Bool, Never>(false)
let values = firstPublisher
.combineLatest(secondPublisher)
.values
var receivedValues: [(Int, Bool)] = []
let task = Task {
for await value in values {
receivedValues.append(value)
}
}
// When
try await Task.sleep(for: .milliseconds(100))
firstPublisher.value = 2
secondPublisher.value = true
try await Task.sleep(for: .milliseconds(100))
task.cancel()
let expectedValues = [(1, false), (2, false), (2, true)]
// Then
#expect(receivedValues.description == expectedValues.description)
}
}
This test fails because receivedValues is [(1, false), (2, false)]. The last value, (2, true), never arrives. The failure rate is 100%, so this isn’t a race condition.
Adding a .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest) fixes the problem.
What am I doing wrong?
You're doing nothing wrong; buffer is the correct solution.
Publishers can model either "push" or "pull" semantics, but AsyncSequence (.values) can only model "pull".
That means that if your publisher chain can "push" (which your CurrentValueSubjects do), you will need a buffer before .values to capture "pushed" values that the iteration of the AsyncSequence is not yet ready to "pull". If you don't have the buffer, there's nowhere for these values to go, so they're simply dropped.
1 Like
Wow, that’s a foot gun and a half. And nothing about it in the docs for Publisher.values or AsyncPublisher.
Anyway, thanks for the explanation!
1 Like
Note that in general .buffer(size: 1, ...) may not be sufficient; the question is "how many elements can the upstream publishers push between iterations of the for await loop?"
.buffer(size: 1, ...) is a leftover from my test code where I intentionally make the buffer small to test the backpressure handling. I will probably wrap the publisher in an AsyncStream anyway (which has unbounded buffer by default).
Its very weird that if you add another line, the final value will be received:
firstPublisher.value = 2
firstPublisher.value = 3 // <- this line is new
secondPublisher.value = true
now receivedValues will be (1, false), (2, false), (3, true).
Theoretically speaking, when any of the upstreams of Publishers.CombineLatest produces a new value, it should result in Publishers.CombineLatest producing a new value, and finally the internal subscriber inside AsyncPublisher receiving a new value. In the end, a certain continuation tied to the .next() call of AsyncPublisher.AsyncIterator should be resumed.
However, just as your code shows, if the upstreams produces values too fast, the behavior is not that clear. I believe it has something to do with the concept of "demand". When we call .buffer(prefetch: .byRequest), it will automatically create an .unlimited demand.
I think this should be considered a bug, the values operator should automatically come with an .unlimited demand. I understand intermediate values can be skipped, but that's a different thing for the last value.
1 Like
The thing is, with no buffer in values, it can only process an upstream emission whilst a call to next() is suspended; at any other time it must drop the value.
Modeling that with a demand of 1 for each call to next is correct; anything else would lead to even more values being dropped in even more circumstances.
I think a better API would've been for values to have an internal buffer, and to accept a buffering behavior as an argument, and maybe even default to a 1-element or even infinite buffer. But that ship sailed, and Combine is dead, and now you have this thread instead.
1 Like