"PassthroughSubject" seems to be thread-unsafe, is this a bug or limitation?

Please see the code below, I'm sending 100 values concurrently to a subscriber which only request .max(5) . Subscriber should only get 5 values, right? but it actually got more. Is this a bug or limitation? Did I misunderstand something? Thank you in advance!

// Xcode11 beta2

var count = 0
let q = DispatchQueue(label: UUID().uuidString)
let g = DispatchGroup()

let subject = PassthroughSubject<Int, Never>()
let subscriber = AnySubscriber<Int, Never>(receiveSubscription: { (s) in
    s.request(.max(5))
}, receiveValue: { v in
    q.sync {
        count += 1
    }
    return .none
}, receiveCompletion: { c in
})
subject.subscribe(subscriber)

for i in 0..<100 {
    DispatchQueue.global().async(group: g) {
        subject.send(i)
    }
}

g.wait()
print("receive", count)  // expect 5, but got more(7, 9...)

Please correct me if I'm wrong, but I don't recall seeing any thread safety guarantees for Combine.

Can you add receive(on:) and then you can probably clean up your other sync code.

There is also a subscribe(on:) operator if you want to specify where the subscription should take place.

Here is a good example how this works in RxSwift: http://rx-marin.com/post/observeon-vs-subscribeon/

Thank you for response! I know receive(on:), but because the framework doesn't have a built-in queue scheduler, I use a serial dispatch queue to keep the counts consistent.
So I guess PassthroughSubject cannot guarantee that subscribers will receive the expected number of values?

It would be pretty disappointing if Combine ships a publisher that doesn’t uphold the backpressure contract. This is something that is worthy of a bug report IMO.

I would guess that this is intentional rather than a bug. If weak references are an unacceptable drag on the performance of sending values then certainly synchronizing all events by default is off the table.

An intentional violation of the publisher contract? If so, I consider that a huge design smell. Contracts like backpressure should be taken seriously. If Combine needs to support publishers that do not support backpressure then the distinction should be modeled in the library. It is not ok to violate a semantic contract like this.

In addition to obeying demand, the Publisher contract also includes a requirement that publishers do not send values concurrently with other values, completion, or subscription. We needed to decide if subjects should enforce this thread safety requirement on their own (adding to overhead by locking) or deferring it to the user of the subject. Ultimately we decided the subject needed to lock on its own, because the subject can receive input itself from two sources - its own upstream or the send() function. That issue will be fixed in an upcoming seed and may be the root cause of this issue.I still filed a bug to check the behavior of the above test case.

To be clear: no publisher, including PassthroughSubject, should send more values than requested.

Thank you for clearing that up! Is the locking overhead significant?

It's not significant in the case where the lock is not contended. It does, however, mean a mutal exclusion to sending values. That's just inherent to the contract and the benefits of subscribers not having to deal with concurrent values is worth it.

1 Like

Thanks for confirming! I'm happy to hear that Apple's publishers are taking the contract seriously, including PassthroughSubject. It would be helpful if you document the behavior of the publishers in cases where they receive more values than requested. For example, when sent values exceed demand does PassthroughSubject drop values or does it buffer?

PassthroughSubject (and CurrentValueSubject) will drop received values if there is no downstream demand. Composition with buffer would provide a buffering behavior if one is needed.

2 Likes

Thanks for clarifying here. It would be great if it is possible to make this kind of semantic information clear in the documentation going forward.

Terms of Service

Privacy Policy

Cookie Policy