CombineLatest dropping events from publishers on different threads?

I'm trying to understand why the following test sometimes fails:

func testCombineLatest() throws {

    let elementCount = 10
    let exp = XCTestExpectation()
    var lastValue = (0, 0)

    let publisherA = PassthroughSubject<Int, Never>()
    let queueA = DispatchQueue(label: "queueA")
    let publisherB = PassthroughSubject<Int, Never>()
    let queueB = DispatchQueue(label: "queueB")

    let queueC = DispatchQueue(label: "queueC")

    let cancellable = publisherA.combineLatest(publisherB)
        .receive(on: queueC)
        .sink(receiveCompletion: { _ in
            exp.fulfill()
        }, receiveValue: { value in
            lastValue = value
        })

    queueA.async {
        for i in (1...elementCount) {
            publisherA.send(i)
        }
        publisherA.send(completion: .finished)
    }

    queueB.async {
        for i in (1...elementCount) {
            publisherB.send(i)
        }
        publisherB.send(completion: .finished)
    }

    wait(for: [exp], timeout: 5)

    queueC.sync {
        XCTAssertTrue(lastValue == (elementCount, elementCount), "\(lastValue)")
    }

    cancellable.cancel()
}

I'd expect lastValue to always equal (10, 10), but about 6 out of 100 test runs fail with either (10, 9) or (9, 10).

I don't see any failures if I remove queueB and send all events from queueA. Based on this comment, I'd assumed that it was safe to use combineLatest with events from multiple threads. Is that assumption incorrect?

You are writing to lastValue on two different queues (queueA and queueB), and reading from it on the main thread, all with no synchronization. I don't think that's safe.

I've updated the code to read/write lastValue on a third queue, that should cover any potential thread safety issues right?

Edit: Assuming yes, I'm still seeing the same failures when running this version of the test.

Terms of Service

Privacy Policy

Cookie Policy