In the code below, I believe you should expect to see:
RECEIVED (1, 0)
among the output after 1 second

import Combine
import Foundation
import _Concurrency

let subject = CurrentValueSubject<Int, Never>(0)

Task {
    let firstPublisher = subject
        .handleEvents(receiveRequest: { demand in print("Demand on First: \(demand.description)") })
    let secondPublisher = Just(0)
        .handleEvents(receiveRequest: { demand in print("Demand on Second: \(demand.description)") })
    let faultyPublisher = Publishers.CombineLatest(firstPublisher, secondPublisher)
        .handleEvents(
            receiveOutput: { value in print("--------------------Value from Latest: \(value)") },
            receiveRequest: { demand in print("Demand on Latest \(demand.description)") }
        )
        .filter { firstValue, _ in firstValue != 0 }
    //    .flatMap(maxPublishers: .unlimited) { Just($0) }
    for await output in faultyPublisher.values {
        print("RECEIVED \(output)")
    }
}

Task {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    subject.send(1)
}

However, what I see instead is that print("RECEIVED \(output)") never gets executed.

I believe this has something to do with how the demand is generated as uncommenting the .flatMap line makes it work as expected.

Some other observations:

  1. If call .sink on faultyPublisher the sink will receive (1, 0) as expected.
  2. If you remove .filter everything works as expected.
  3. If you do firstValue == 0 instead of firstValue != 0 everything works as expected.
  4. If you don't use Publishers.CombineLatest and instead you use the .filter line on firstPublisher everything works as expected.

I believe this is a bug with values: AsyncPublisher. Is that right?

I'm trying to figure out a similar interaction involving AsyncPublisher, CombineLatest, and Filter. I have two further observations:

  • Moving the filter to the AsyncSequence still fails. i.e.:
    1. Remove .filter from faultyPublisher
    2. Change the for loop to:
    for await output in faultyPublisher.values.filter({ firstValue, _ in firstValue != 0 })
    
  • Swapping the parameters passed to CombineLatest (and testing for secondValue != 0 in the filter) makes it work (?!).

I think uncommenting your flatMap works because it requests .unlimited, same as .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest) or .first() (if you only care about one element).

At this point I'm thoroughly confused, and suspicious of both AsyncPublisher and CombineLatest.