`Sink.receiveValue` is not called after `publisher.receive(on: backgroundQueue)`

With the following code, after I put receive(on: backgroundQueue), the receiveCompletion will be called 100%, but the receiveValue block is not.

xxxxPublisher
    .xxxx()
    .receive(on: backgroundQueue)
    .xxxx()
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        // completion code
    }, receiveValue: { value in
        // receive value code
    }).store(in: &cancellables)

This seems not a good behavior, are we supposed to not use receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) in this way? Am I missing something?

Here is the code to reproduce this bug, if you run this code, you will see that receiveCompletion is called exactly 300 times, but receiveValue is called less than 300.

import Foundation
import Combine

private var cancellables: Set<AnyCancellable> = []
let backgroundQueue = DispatchQueue.global(qos: .background)
for i in 1...300 {
    runPublisher(i)
}
var sinkCompletedIndices = Set<Int>()
var sinkOutputIndices = Set<Int>()

func runPublisher(_ index: Int) {
    [1].publisher
    .receive(on: backgroundQueue)
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        NSLog("sink receiveCompletion")
        sinkCompletedIndices.insert(index)
    }, receiveValue: { value in
        NSLog("sink receiveValue")
        sinkOutputIndices.insert(index)
    }).store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
    let diff = sinkCompletedIndices.filter { !sinkOutputIndices.contains($0) }
    NSLog("Difference between completions and outputs \(diff)")
}
RunLoop.main.run()
1 Like

Could you find a solution for this issue?

I experience a similar issue with an operator implemented in one of my projects:

public func await(timeout: TimeInterval = 10, on queue: DispatchQueue = .global(qos: .background)) -> Result<[Output], Error> {
        var values = [Output]()
        var completion: Subscribers.Completion<Failure>?

        let semaphore = DispatchSemaphore(value: 0)
        let cancellable = self
            .receive(on: queue)
            .sink { result in
                completion = result
                semaphore.signal()
            } receiveValue: { value in
                values.append(value)
            }
        _ = semaphore.wait(timeout: .now() + .milliseconds(Int(timeout * 1000)))

        switch completion {
        case .failure(let error):
            return .failure(error)
        case .finished:
            return .success(values)
        case nil:
            // If we enter this code path, then our test has
            // already been marked as failing, since our
            // expectation was never fullfilled.
            cancellable.cancel()
            return .failure(AwaitError.timeout)
        }
    }

    public enum AwaitError: Error {
        case timeout
    }

Sometimes, receiveCompletion is called before receiveValue is called.
Thus in this case the values Array is empty when it enters the switch.

In our application this issue appears indeterministic.

Terms of Service

Privacy Policy

Cookie Policy