Publishers.CombineLatest thread safety

Combine is generally intended to follow the Reactive Streams spec, and rule 1.3 says

onSubscribe , onNext , onError and onComplete signaled to a Subscriber MUST be signaled serially.

In Combine terms, this means that it is a publisher's responsibility to call a subscriber's receive(subscription:), receive(_:) and receive(completion:) without overlap.

So, if CombineLatest follows the spec, then it serializes receive(_:) calls to its subscriber, even if its own inputs send it signals simultaneously on separate threads.

But, does it follow the spec? Here's a tiny test program:

import Combine

let ticket = Just("hello")
    .combineLatest(Just("world"))
    .sink(receiveValue: {
        print($0)
    })

If I run this with a breakpoint on the sink body, then I see this call stack at the breakpoint:

#0	0x0000000100001a36 in closure #1 in  at /Users/mayoff/TestProjects/combineTest/combineTest/main.swift:6
#1	0x0000000100001bac in thunk for @escaping @callee_guaranteed (@guaranteed String, @guaranteed String) -> () ()
#2	0x00007fff3496dbe1 in Subscribers.Sink.receive(_:) ()
#3	0x00007fff3496ddf0 in protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> ()
#4	0x00007fff3497f733 in AbstractCombineLatest.receive(_:index:) ()
✂︎ additional frames omitted ✂︎

And if I then examine frame #4, I find the following code waiting to be executed:

->  0x7fff3497f733 <+771>:  mov    r13, rax
    0x7fff3497f736 <+774>:  mov    rdi, r14
    0x7fff3497f739 <+777>:  mov    rsi, qword ptr [rbp - 0xa0]
    0x7fff3497f740 <+784>:  mov    rax, qword ptr [rbp - 0xc0]
    0x7fff3497f747 <+791>:  call   qword ptr [rax + 0x8]
    0x7fff3497f74a <+794>:  mov    rdi, qword ptr [r12 + rbx]
    0x7fff3497f74e <+798>:  call   0x7fff34a18eec            ; symbol stub for: os_unfair_recursive_lock_unlock

The call at +791 is to tuple_destroy and is not of interest.

The call at +798, to os_unfair_recursive_lock_unlock, tells us that CombineLatest holds a recursive lock whilst calling a subscriber's receive(_:). If we assume the lock is associated with the subscription, then it prevents CombineLatest from calling a subscriber's receive(_:) simultaneously from multiple threads, thus obeying the spec.

3 Likes