`multicast(_:)` vs `multicast(subject:)` expected behaviour

In the docs for the multicast(_:) operator it says:

In contrast with multicast(subject:) , this method produces a publisher that creates a separate Subject for each subscriber.

With this is mind, one might expect that the provided closure would be executed for each subscriber – but this doesn't seem to be the case.

Here's a test which demonstrates the behaviour I'm expecting:

func testMulticastWithCreateSubjectClosureIsExecutedForEachSubscriber() {
    
    var cancellables = Set<AnyCancellable>()
    var createSubjectCallCount = 0
    let subject = Just(-1).multicast { () -> PassthroughSubject<Int, Never> in
        createSubjectCallCount += 1
        return .init()
    }
    
    XCTAssertEqual(createSubjectCallCount, 0)
    
    subject.sink { _ in }.store(in: &cancellables)
    
    XCTAssertEqual(createSubjectCallCount, 1)
    
    subject.sink { _ in }.store(in: &cancellables)
    
    XCTAssertEqual(createSubjectCallCount, 2) // XCTAssertEqual failed: ("1") is not equal to ("2")
}

Is this expected behaviour – or should I file a bug report?

cc/ @Tony_Parker, @Philippe_Hausler

Yep, I've noticed this too and already filed a bug report: FB7362069

It seems like this is just the documentation issue rather than the implementation issue. The implementation totally makes sense (it just lazily creates a subject).

Ahh, I see. But then how do we create the equivalent of 'reference counted' subject?

If it were to work the way I expected it to work, you could imagine using something like .multicast { SomeSubject<T, Never>() }.autoconnect().share() in a way that discards the subject when the last subscriber disconnects, and recreates it again when a new subscriber comes along.

Similar to the 'refCounted()' operator in other implementations.

somePublisher.makeConnectable().autoconnect() allows you to achieve the reference counting behavior, but I'm not sure it recreates the subject. Why do you need it though?

I have a ReplaySubject and an associated shareReplay(count:) operator to provide behaviour similar to that of other implementations.

However, whereas most other implementations would do something like:
multicast { ReplaySubject<Output, Failure>(maxBufferSize: maxBufferSize) }.refCount(), I'm using autoconnect() instead of refCount.

This means my share replay deviates from the expected behaviour from other implementations, which I assume have decided on that behaviour to prevent potentially expensive replay buffers sticking around after the last subscriber disappears.

I had hoped the multicast(_:) operator would facilitate a way around this by recreating a new subject for each subscriber – which can then be 'shared()` to create a reference counting behaviour.

Maybe, the solution is actually... a refCounted() operator in addition to autoconnect()!

The documentation perhaps needs some refinement here. It maybe should read that it creates a subscription to the subject for every connected subscribe call. Share is effectively just a subject plus an auto connected multicast.

By nature subjects are multicasters. They send values to all streams subscribed.

Right, thanks for your response!

So, really if we want 'refCount()' behaviour for a ConnectablePublisher – there isn't a built-in way to do that?

I have found a workaround to get this behaviour by creating an operator exclusively for multicast publishers that simply grabs their 'upstream' and 'createSubject' values and then produces a new multicast publisher to be shared amongst downstream subscribers in a 'refCount' style manner – but it might be nice to have this handled by Combine.

If you could it might be good to write up an enhancement request bug with some good details on what that would do as well as some explanation on the use case.

1 Like