With the following code, after I put receive(on: backgroundQueue)
, the receiveCompletion
will be called 100%, but the receiveValue
block is not.
xxxxPublisher
.xxxx()
.receive(on: backgroundQueue)
.xxxx()
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
// completion code
}, receiveValue: { value in
// receive value code
}).store(in: &cancellables)
This seems not a good behavior, are we supposed to not use receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil)
in this way? Am I missing something?
Here is the code to reproduce this bug, if you run this code, you will see that receiveCompletion
is called exactly 300 times, but receiveValue
is called less than 300.
import Foundation
import Combine
private var cancellables: Set<AnyCancellable> = []
let backgroundQueue = DispatchQueue.global(qos: .background)
for i in 1...300 {
runPublisher(i)
}
var sinkCompletedIndices = Set<Int>()
var sinkOutputIndices = Set<Int>()
func runPublisher(_ index: Int) {
[1].publisher
.receive(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
NSLog("sink receiveCompletion")
sinkCompletedIndices.insert(index)
}, receiveValue: { value in
NSLog("sink receiveValue")
sinkOutputIndices.insert(index)
}).store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
let diff = sinkCompletedIndices.filter { !sinkOutputIndices.contains($0) }
NSLog("Difference between completions and outputs \(diff)")
}
RunLoop.main.run()