tokijh
(Yoon Joonghyun)
1
Does receive(on:) have a performance limit?
Using ConcurrentDispatchQueue, as in the test code below, the sink is not fully called.
func testConcurrentDispatchQueueScheduler() {
// given
let concurrentQueue = DispatchQueue(label: "", qos: .default, attributes: .concurrent)
let lock = NSLock()
var count: Int = 0
var cancellables: Set<AnyCancellable> = []
// when
for _ in 0..<100 {
Just(Void())
.receive(on: concurrentQueue)
.sink { _ in
lock.lock(); defer { lock.unlock() } // make thread safely
count += 1
}
.store(in: &cancellables)
}
XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait
// then
XCTAssertEqual(count, 100) // XCTAssertEqual failed: ("64") is not equal to ("100")
}
But, when use SerialDispatchQueue or queue.async { } instead of receive(on:) has not problem. (See below).
func testSerialDispatchQueueScheduler() {
// given
let serialQueue = DispatchQueue(label: "", qos: .default)
let lock = NSLock()
var count: Int = 0
var cancellables: Set<AnyCancellable> = []
// when
for _ in 0..<100 {
Just(Void())
.receive(on: serialQueue)
.sink { _ in
lock.lock(); defer { lock.unlock() } // make thread safely
count += 1
}
.store(in: &cancellables)
}
XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait
// then
XCTAssertEqual(count, 100) // succeeded
}
func testConcurrentDispatchQueueAsync() {
// given
let concurrentQueue = DispatchQueue(label: "", qos: .default, attributes: .concurrent)
let lock = NSLock()
var count: Int = 0
var cancellables: Set<AnyCancellable> = []
// when
for _ in 0..<100 {
Just(Void())
.sink { _ in
concurrentQueue.async {
lock.lock(); defer { lock.unlock() } // make thread safely
count += 1
}
}
.store(in: &cancellables)
}
XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait
// then
XCTAssertEqual(count, 100) // succeeded
}
The problem with concurrent queues and receive(on:) is that you may drop a value btw. Combine goes to great lengths to make it "safe enough" that it often does not matter... but you likely just want a serial queue (being that is the well behaved non UB territory).
The lock in the sink is not needed, by contract the values received by .sink are atomically emitted.
My guess is that the vast majority of the perf you are measuring here is the setup time of the stream and not the delivery speed of the stream (the latter is likely the more meaningful measurement).