Combine is generally intended to follow the Reactive Streams spec, and rule 1.3 says
onComplete signaled to a
Subscriber MUST be signaled serially.
In Combine terms, this means that it is a publisher's responsibility to call a subscriber's
receive(completion:) without overlap.
CombineLatest follows the spec, then it serializes
receive(_:) calls to its subscriber, even if its own inputs send it signals simultaneously on separate threads.
But, does it follow the spec? Here's a tiny test program:
let ticket = Just("hello")
If I run this with a breakpoint on the
sink body, then I see this call stack at the breakpoint:
#0 0x0000000100001a36 in closure #1 in at /Users/mayoff/TestProjects/combineTest/combineTest/main.swift:6
#1 0x0000000100001bac in thunk for @escaping @callee_guaranteed (@guaranteed String, @guaranteed String) -> () ()
#2 0x00007fff3496dbe1 in Subscribers.Sink.receive(_:) ()
#3 0x00007fff3496ddf0 in protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> ()
#4 0x00007fff3497f733 in AbstractCombineLatest.receive(_:index:) ()
✂︎ additional frames omitted ✂︎
And if I then examine frame #4, I find the following code waiting to be executed:
-> 0x7fff3497f733 <+771>: mov r13, rax
0x7fff3497f736 <+774>: mov rdi, r14
0x7fff3497f739 <+777>: mov rsi, qword ptr [rbp - 0xa0]
0x7fff3497f740 <+784>: mov rax, qword ptr [rbp - 0xc0]
0x7fff3497f747 <+791>: call qword ptr [rax + 0x8]
0x7fff3497f74a <+794>: mov rdi, qword ptr [r12 + rbx]
0x7fff3497f74e <+798>: call 0x7fff34a18eec ; symbol stub for: os_unfair_recursive_lock_unlock
The call at +791 is to
tuple_destroy and is not of interest.
The call at +798, to
os_unfair_recursive_lock_unlock, tells us that
CombineLatest holds a recursive lock whilst calling a subscriber's
receive(_:). If we assume the lock is associated with the subscription, then it prevents
CombineLatest from calling a subscriber's
receive(_:) simultaneously from multiple threads, thus obeying the spec.