Performance limit on receive(on:)?

Does receive(on:) have a performance limit?


Using ConcurrentDispatchQueue, as in the test code below, the sink is not fully called.

func testConcurrentDispatchQueueScheduler() {
  // given
  let concurrentQueue = DispatchQueue(label: "", qos: .default, attributes: .concurrent)
  let lock = NSLock()
  var count: Int = 0
  var cancellables: Set<AnyCancellable> = []

  // when
  for _ in 0..<100 {
    Just(Void())
      .receive(on: concurrentQueue)
      .sink { _ in
        lock.lock(); defer { lock.unlock() } // make thread safely
        count += 1
      }
      .store(in: &cancellables)
  }

  XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait

  // then
  XCTAssertEqual(count, 100) // XCTAssertEqual failed: ("64") is not equal to ("100")
}

But, when use SerialDispatchQueue or queue.async { } instead of receive(on:) has not problem. (See below).

func testSerialDispatchQueueScheduler() {
  // given
  let serialQueue = DispatchQueue(label: "", qos: .default)
  let lock = NSLock()
  var count: Int = 0
  var cancellables: Set<AnyCancellable> = []
  
  // when
  for _ in 0..<100 {
    Just(Void())
      .receive(on: serialQueue)
      .sink { _ in
        lock.lock(); defer { lock.unlock() } // make thread safely
        count += 1
      }
      .store(in: &cancellables)
  }
  
  XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait
  
  // then
  XCTAssertEqual(count, 100) // succeeded
}
func testConcurrentDispatchQueueAsync() {
  // given
  let concurrentQueue = DispatchQueue(label: "", qos: .default, attributes: .concurrent)
  let lock = NSLock()
  var count: Int = 0
  var cancellables: Set<AnyCancellable> = []

  // when
  for _ in 0..<100 {
    Just(Void())
      .sink { _ in
        concurrentQueue.async {
          lock.lock(); defer { lock.unlock() } // make thread safely
          count += 1
        }
      }
      .store(in: &cancellables)
  }

  XCTWaiter().wait(for: [XCTestExpectation()], timeout: 1) // wait

  // then
  XCTAssertEqual(count, 100) // succeeded
}

The problem with concurrent queues and receive(on:) is that you may drop a value btw. Combine goes to great lengths to make it "safe enough" that it often does not matter... but you likely just want a serial queue (being that is the well behaved non UB territory).

The lock in the sink is not needed, by contract the values received by .sink are atomically emitted.

My guess is that the vast majority of the perf you are measuring here is the setup time of the stream and not the delivery speed of the stream (the latter is likely the more meaningful measurement).

Terms of Service

Privacy Policy

Cookie Policy