Publishers.CombineLatest thread safety

Hi,

the combineLatest operator combines 2 different values potentially coming from 2 different threads. So the transform closure is potentially called from 2 different threads. Is this thread safe or do I have to make sure that transform is always called from the same thread?

Greetings,
Johannes

I don't think transform closures are a good place to execute side-effects. In reactive programming, such closures are better be pure functions, i. e. not have any side-effects. In this case, you shouldn't really care about from which thread they're called.

I am not talking about side effects - the args for the transform closure are coming from the CombineLatest publisher. If the first arg is coming vom thread A, then transform is executed on thread A. If the second arg is coming vom thread B, then transform is executed on thread B. 'transform' is using values from 2 different threads, on different threads.

Because of that, CombineLatest must somehow handle thread safety. Thats my question - is CombineLatest thread safe?

Combine is generally intended to follow the Reactive Streams spec, and rule 1.3 says

onSubscribe , onNext , onError and onComplete signaled to a Subscriber MUST be signaled serially.

In Combine terms, this means that it is a publisher's responsibility to call a subscriber's receive(subscription:), receive(_:) and receive(completion:) without overlap.

So, if CombineLatest follows the spec, then it serializes receive(_:) calls to its subscriber, even if its own inputs send it signals simultaneously on separate threads.

But, does it follow the spec? Here's a tiny test program:

import Combine

let ticket = Just("hello")
    .combineLatest(Just("world"))
    .sink(receiveValue: {
        print($0)
    })

If I run this with a breakpoint on the sink body, then I see this call stack at the breakpoint:

#0	0x0000000100001a36 in closure #1 in  at /Users/mayoff/TestProjects/combineTest/combineTest/main.swift:6
#1	0x0000000100001bac in thunk for @escaping @callee_guaranteed (@guaranteed String, @guaranteed String) -> () ()
#2	0x00007fff3496dbe1 in Subscribers.Sink.receive(_:) ()
#3	0x00007fff3496ddf0 in protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> ()
#4	0x00007fff3497f733 in AbstractCombineLatest.receive(_:index:) ()
✂︎ additional frames omitted ✂︎

And if I then examine frame #4, I find the following code waiting to be executed:

->  0x7fff3497f733 <+771>:  mov    r13, rax
    0x7fff3497f736 <+774>:  mov    rdi, r14
    0x7fff3497f739 <+777>:  mov    rsi, qword ptr [rbp - 0xa0]
    0x7fff3497f740 <+784>:  mov    rax, qword ptr [rbp - 0xc0]
    0x7fff3497f747 <+791>:  call   qword ptr [rax + 0x8]
    0x7fff3497f74a <+794>:  mov    rdi, qword ptr [r12 + rbx]
    0x7fff3497f74e <+798>:  call   0x7fff34a18eec            ; symbol stub for: os_unfair_recursive_lock_unlock

The call at +791 is to tuple_destroy and is not of interest.

The call at +798, to os_unfair_recursive_lock_unlock, tells us that CombineLatest holds a recursive lock whilst calling a subscriber's receive(_:). If we assume the lock is associated with the subscription, then it prevents CombineLatest from calling a subscriber's receive(_:) simultaneously from multiple threads, thus obeying the spec.

Correct -- all of the operators which can receive input from more than one upstream need to have a "downstream lock" to prevent calling their subscriber with two values (or a value and a completion) simultaneously.

To be clear - this is done in the implementation of Combine, so you don't have to do anything extra yourself to get valid behavior.

1 Like

Thanks guys for your explanation :+1: