In the code below, I believe you should expect to see:
RECEIVED (1, 0)
among the output after 1 second
import Combine
import Foundation
import _Concurrency
let subject = CurrentValueSubject<Int, Never>(0)
Task {
let firstPublisher = subject
.handleEvents(receiveRequest: { demand in print("Demand on First: \(demand.description)") })
let secondPublisher = Just(0)
.handleEvents(receiveRequest: { demand in print("Demand on Second: \(demand.description)") })
let faultyPublisher = Publishers.CombineLatest(firstPublisher, secondPublisher)
.handleEvents(
receiveOutput: { value in print("--------------------Value from Latest: \(value)") },
receiveRequest: { demand in print("Demand on Latest \(demand.description)") }
)
.filter { firstValue, _ in firstValue != 0 }
// .flatMap(maxPublishers: .unlimited) { Just($0) }
for await output in faultyPublisher.values {
print("RECEIVED \(output)")
}
}
Task {
try await Task.sleep(nanoseconds: 1_000_000_000)
subject.send(1)
}
However, what I see instead is that print("RECEIVED \(output)") never gets executed.
I believe this has something to do with how the demand is generated as uncommenting the .flatMap line makes it work as expected.
Some other observations:
- If call
.sink on faultyPublisher the sink will receive (1, 0) as expected.
- If you remove
.filter everything works as expected.
- If you do
firstValue == 0 instead of firstValue != 0 everything works as expected.
- If you don't use
Publishers.CombineLatest and instead you use the .filter line on firstPublisher everything works as expected.
I believe this is a bug with values: AsyncPublisher. Is that right?
nolanw
2
I'm trying to figure out a similar interaction involving AsyncPublisher, CombineLatest, and Filter. I have two further observations:
I think uncommenting your flatMap works because it requests .unlimited, same as .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest) or .first() (if you only care about one element).
At this point I'm thoroughly confused, and suspicious of both AsyncPublisher and CombineLatest.