RunLoop.main or DispatchQueue.main when using Combine scheduler?

When I look at the documentation of Combine framework, I see an example on the receive(on:) method.

///     let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
///     let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.
///
///     jsonPublisher
///         .subscribe(on: backgroundQueue)
///         .receiveOn(on: RunLoop.main)
///         .subscribe(labelUpdater)
public func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Self, S> where S : Scheduler

I'm curious what's the difference between RunLoop.main or DispatchQueue.main as the scheduler when dispatching to the main queue.

.receive(on: RunLoop.main)

v.s.

.receive(on: DispatchQueue.main)

I found a same question on StackOverflow but with no answer.

BTW, I believe there is a typo in the official documentation?

///     jsonPublisher
///         .subscribe(on: backgroundQueue)
///         .receiveOn(on: RunLoop.main)  <------ should be receive(on: RunLoop.main)
6 Likes

RunLoop.main as a Scheduler ends up calling RunLoop.main.perform whereas DispatchQueue.main calls DispatchQueue.main.async to do work, for practical purposes they are nearly isomorphic. The only real differential is that the RunLoop call ends up being executed in a different spot in the RunLoop callouts whereas the DispatchQueue variant will perhaps execute immediately if optimizations in libdispatch kick in. In reality you should never really see a difference tween the two.

16 Likes

Then, a follow-up question would be: Why have two? Or, "Under what circumstances should I use one versus the other?"

3 Likes

RunLoop should be when you have a dedicated thread with a RunLoop running, DispatchQueue can be any queue scenario (and for the record please avoid running RunLoops in DispatchQueues, it causes some really gnarly resource usage...). Also it is worth noting that the DispatchQueue used as a scheduler must always be serial to adhere to the contracts of Combine's operators.

7 Likes

Thanks, now I have a better understanding whether to use RunLoop or DispatchQueue as Scheduler.

I was wondering how people would be able to learn this from the documentation? (Choosing Dispatch Queue or RunLoop)
And It seems like neither extension DispatchQueue: Scheduler { ... } nor protocol Scheduler { ... } mentions about the serial queue requirement for Combine's operators.

I tried to use a concurrent queue as the scheduler but there is no warning in the console or crash to help programmers avoid this.

let concurrentQueue = DispatchQueue(label: "ConcurrentQueue", qos: .default, attributes: .concurrent)

Just(10)
    .subscribe(on: concurrentQueue)
    .map { "\($0)" }
    .receive(on: RunLoop.main)
    .sink { value in print(value) }
1 Like

Is the constraint to use .serial vs. .concurrent DispatchQueue for subscribe() mentioned anywhere? I've been scouring the docs, and like @royhsu I didn't see any compilation warning or crash when I used .concurrent

1 Like

Is this really true? Then you couldn‘t offload any heavy work to the global concurrent queues.

I think the point you tried to make here was that the reactive streams can derail quickly if the user don‘t understand how they should be properly synchronized.

For anyone interested, here is the difference between subscribeOn and observeOn operators in RxSwift, which is fairly similar to what Combine has. People always get confused about them.

subscribeOn is rarely needed.

1 Like

My guess is you can still target global queues

let backgroundSerialQueue = DispatchQueue(
    label: "SerialQueue",
    target: .global(qos: .background) // <--- target to global queue
)

Just(10)
    .subscribe(on: backgroundSerialQueue)
    .map { "\($0)" }
    .receive(on: RunLoop.main)
    .sink { value in print(value) }

But maybe I'm wrong.

You can really use any queue you want as long as you understand the implications. ;) If that wasn‘t the case then DispatchQueue wouldn‘t be allowed as a scheduler.

However what I‘m interested in is, to know if receiving on the main RunLoop also does do anything about the run mode.

For example, timers are paused if you start scrolling a scrollview unless the timer is added to the run loop in the common mode.

Is there any similarity here for combine operations that are based on timer emission?

1 Like

So to clarify the commentary about serial queues only applies to the beta SDKs. It is something we are considering addressing systemically in the operators that take schedulers. That is a careful balance between the cost of locking versus the cost of correctness for exclusivity of streams: there are other prior art that are similar here: reactive-streams-jvm/README.md at v1.0.2 · reactive-streams/reactive-streams-jvm · GitHub specifically rule 1.03 requires the events to be thread safe.

Per the RunLoop those are currently scheduled in the common mode. I would be interested if there is actually a use case of having options to use a different mode. Please file a feedback/radar that is something that we should address if it has a serious use case.

3 Likes

sadly there is no real way to detect if a given queue passed in is concurrent or serial from my understanding.

1 Like

RxSwift solves this by creating a serial queue which targets the queue of unknown kind. So you always have a serial dispatch of events, on the queue you specified, even if it is not serial.

1 Like

Not every scheduler does this: RxSwift/ConcurrentDispatchQueueScheduler.swift at main · ReactiveX/RxSwift · GitHub

What you described was the serial dispatch queue scheduler:

The problem is that it‘s not possible to obtain information about the queue you passed in and to guarantee the serialization they use an internal serial queue as a target.

I don‘t have any use case for other target modes, but when Combine was introduced and both RunLoop and DispatchQueue were valid scheduler types I asked myself if theoretically a reactive sequence could be paused just like a timer which is not added to the run loops common mode when you rout it through DispatchQueue.main instead of RunLoop.main, but I don‘t know the details about the internal implementation of Dispatch in that regard either.

This topic is all about incorrect naming convention, right?
subscribe() and subscribeOn() for me are too close. Maybe it is time to fix this?

Please read the original post and the discussion. This topic is not about any naming conventions. Furthermore subscribe and subscribe(on:) serve two distinct purposes but the operation names remain exact to the point.

1 Like

You are right.
I have extrapolated the post observeOn vs. subscribeOn // rx_marin<blog> on Combine.

What does it tell?
That subscribe() creates observer and subscribeOn() provides a thread/queue, on which subscription code happens.

So to follow up here, there are some changes incoming in the regards to the way downstream events are propagated. We are now able to satisfy the constraint of 1.03 even if the DispatchQueue is concurrent or the OperationQueue is not a restriction of maxConcurrentOperations of 1, or for that matter any valid scheduler being concurrent; we will always send serialized events on that requested scheduler for .receive(on:). The one remaining caveat that we deviate from the specification slightly is that upstream events such as cancel() and request(_:) in our world can happen concurrently. That being said we do handle them in a thread safe manner.

5 Likes