Combine: what are those multicast functions for?

Struggling with some combine problems I came across the "Working with Multiple Subscribers" section in https://developer.apple.com/documentation/combine/publisher :

func multicast<S>(() -> S) -> Publishers.Multicast<Self, S>

func multicast<S>(subject: S) -> Publishers.Multicast<Self, S>

However, when I tried to confirm my assumption that multicast would be needed when sending to multiple subscribers, I found out this is not necessary when trying on this playground code (run on 10.14.5 in Xcode Version 11.0 beta 3 (11M362v)):

enum FormError: Error { }

let usernamePublisher = PassthroughSubject<String, FormError>()
let passwordPublisher = PassthroughSubject<String, FormError>()

let validatedCredentials = Publishers.CombineLatest(usernamePublisher, passwordPublisher)
    .map { (username, password) -> (String, String) in
        return (username, password)
    }
    .map { (username, password) -> Bool in
        !username.isEmpty && !password.isEmpty && password.count > 12
    }
    .eraseToAnyPublisher()

let firstSubscriber = validatedCredentials.sink { (valid) in
    print("First Subscriber: CombineLatest: Are the credentials valid: \(valid)")
}

let secondSubscriber = validatedCredentials.sink { (valid) in
    print("Second Subscriber: CombineLatest: Are the credentials valid: \(valid)")
}

// Nothing will be printed yet as `CombineLatest` requires both publishers to have send at least one value.
usernamePublisher.send("avanderlee")
passwordPublisher.send("weakpass")
passwordPublisher.send("verystrongpassword")

this prints:

First Subscriber: CombineLatest: Are the credentials valid: false
Second Subscriber: CombineLatest: Are the credentials valid: false
First Subscriber: CombineLatest: Are the credentials valid: true
Second Subscriber: CombineLatest: Are the credentials valid: true

So my question is: if multicast ist not needed when sending to multiple subscribers what is it for and how would I use it (example code welcome)?

Thanks,

Lars

2 Likes

Multicast is used if you want to control the starting point of the event emission.

It conforms to this protocol:
https://developer.apple.com/documentation/combine/connectablepublisher

This is similar to RxSwift‘s multicast operator:
http://reactivex.io/documentation/operators/publish.html

To give some context, multicast is used as a component to build up the .share() operator. In that case it used to connect an upstream to a PassthroughSubject and then is auto connected. Normally when a Subscriber receives a Subscription it will cancel any additional Subscriptions past the first, Multicast gives an escape hatch to this behavior and handles multiple Subscriptions.

@DevAndArtist, @Philippe_Hausler Please excuse my ignorance, but I am not only new to Combine but also to the whole reactive programming topic. And because Combine is also very new it is somewhat difficult to find any stuff related to this on the web. Also, even Apple's docs are merely bare bones.

So I still wonder if it is legal to connect more than one Subscriber to one Publisher or whether this Multicast feature is needed. In my experiment in the code above it seems to work without Multicast, but this could be a bug or quirk. I am asking because in some other code here: Using @Published in conjunction with @State in SwiftUI I have the problem, that some subsequent Publisher never fires. This Publisher, named validatedCredentials is connected to two other publishers, validatedEMail and validatedPassword like this:

class RegistrationModel : BindableObject {
    @Published var eMail: String = ""
    @Published var password: String = ""
    @Published var passwordRepeat: String = ""

    public var didChange = PassthroughSubject<Void, Never>()

    var validatedEMail: AnyPublisher<String?, Never> {
        return $eMail
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .removeDuplicates()
            .flatMap { username in
                return Future { promise in
                    self.usernameAvailable(username) { available in
                        promise(.success(available ? username : nil))
                    }
                }
        }
        .eraseToAnyPublisher()
    }

    var validatedPassword: AnyPublisher<String?, Never> {
        return Publishers.CombineLatest($password, $passwordRepeat)
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .map { password, passwordRepeat in
                guard password == passwordRepeat, password.count > 5 else { return nil }
                return password
        }
        .eraseToAnyPublisher()
    }

    var validatedCredentials: AnyPublisher<(String, String)?, Never> {
        return Publishers.CombineLatest(validatedEMail, validatedPassword)
            .map { validatedEMail, validatedPassword in
                guard let eMail = validatedEMail, let password = validatedPassword else { return nil }
                return (eMail, password)
        }
        .eraseToAnyPublisher()
    }


    func usernameAvailable(_ username: String, completion: (Bool) -> Void) {
        let isValidEMailAddress: Bool = NSPredicate(format:"SELF MATCHES %@", "[A-Z0-9a-z._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,64}").evaluate(with: username)

        completion(isValidEMailAddress)
    }
}

Not shown here but maybe important: validatedEMail and validatedPassword are also connected to some SwiftUI Text-Views at the same time (just for debugging purposes).

So my question is: would I need some Multicast-Magic here to make it work? If so, how would I use this ConnectablePublisher? Would I just add a .makeConnectable() after the .eraseToAnyPublisher() and change the method signature accordingly? What else would be needed?

Thanks a lot,

Lars

I don‘t see any issues here at a quick glance. In general you don‘t need multicast at all, it‘s rarely used, likely only in very critical use cases where the emission of values must be explicitly triggered by you instead of the automatic behavior during subscription.

That said, forget multicast, your code looks okay using combineLatest. Remember that this operator will wait until every parameter sequence emitted at least one value, otherwise it won‘t proceed.

Since you are new to reactive programming and Combine docs aren‘t very teaching I would recommend to read the behavior about reactive operators from other libraries like RxSwift, since Combine is heavily inspired by such libraries. To help you with that here is a cheat sheet that compares the operators and types between RxSwift and Combine: https://medium.com/gett-engineering/rxswift-to-apples-combine-cheat-sheet-e9ce32b14c5b

One more thing. I don‘t recommend using flatMap operator because it‘s evil. It will break your neck at some point because of its non-intuitive behavior. flatMap may flatten some value like flatMap does in the stdlib, but in a reactive framework there is more going on behind the scenes.

Let‘s say you flatten an async and very long operation using flatMap, however if you want that operation to be canceled when the upstream emits a new value, well here comes the evil part. The flattened operation will run until it completes or fails. If you flattened an infinite stream (something that never completes), then flatMap will never complete and nor will your whole sequence ever complete. Furthermore flatMap does not guarantee any serialization. If you flattened A and B sequences, it‘s not guaranteed that A will emit first and then followed by B. The order B then A is valid as well. It‘s also an operator that will create a lot of hard to find retain cycles and leaked memory.

That said beginners should NOT use flatMap operator at all. To solve all the above issues RxSwift introduces flatMapLatest operator, which guarantees invalidation of previously flattened sequence and very easy to drive. If you also want to invalidate the previously flattened sequence and not emit any further values you can use Empty that completes immediately.

In Combine there is no single operator for flatMapLatest, you have to use map which returns a publisher not just its Output type, and then use switchToLatest which will the trick here.

On the App Store there is a RxMarbles app that contains a list of most RxSwift operators and lets you play with the marble diagrams to understand the reactive operations better.

2 Likes

Time for a Combine Marbles App.
I hope Apple will make a world class documentation for Combine as the concepts are new to many and - as you pointed out with flatMap - sometimes surprising even to those who know the basics of reactive programming.

3 Likes

I also filed a feedback report that Combine is missing a very critical operator called concatMap in other frameworks (cc @Philippe_Hausler) FB6186734.

In the report I showcased one problem that exists with flatMap as mentioned above. Here is a small playground snippet to test it yourself.

import PlaygroundSupport
import Combine

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let publisher = PassthroughSubject<Int, Never>()

let something = publisher
  .flatMap { value in
    Publishers.Sequence(sequence: [Int](repeating: value, count: value))
      .flatMap { value in
        Just(value).delay(
          for: .seconds(Double.random(in: 1 ... 4)),
          scheduler: RunLoop.main
        )
      }
  }
  .sink { value in
    print(value)
  }

Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)

DispatchQueue.main.asyncAfter(deadline: .now() + 1200) {
  something.cancel()
  page.finishExecution()
}

The order of now random because each flattened publisher emits at a different time. However with concatMap this would be different and the order would be always the same and predictable, as the operator don't allow parallel execution of flattened publishers and acts more like a queue that merges every incoming publisher one by one until each of them completes before taking the next one.

Here is how RxSwift handles concatMap:

The expected order for concatMap would be:

1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 6 6 6 6 6 6 7 7 7 7 7 7 7 8 8 8 8 8 8 8 8 9 9 9 9 9 9 9 9 9 
1 Like

@IOOI and for other readers of this thread.

The most important takeaway from here:

  • Learn how sequences complete (super important), followed by how each operator behaves in terms of completion.
  • Learn how operators behave, not purely just by it's name
    • flatMap does more than stdlib collection operator in reactive frameworks.
    • combineLatest waits until each sequence emitted at least one value.
    • differences between zip and combineLatest
    • etc. etc.
  • How sequences fail and how to recover from an upstream failure.
  • New in Combine is the concept of back pressure (I have to learn it myself as well).

Marble diagrams are your friends.

5 Likes

I changed my code to:

var validatedEMail: AnyPublisher<String?, Never> {
    return $eMail
        .debounce(for: 0.5, scheduler: RunLoop.main)
        .removeDuplicates()
        .map { username in
            return Future { promise in
                self.usernameAvailable(username) { available in
                    promise(.success(available ? username : nil))
                }
            }
    }
    .switchToLatest()
        .eraseToAnyPublisher()
}

but this didn't help the issue: the validatedEMail Publisher still works as expected but the validatedCredentials Publisher still doesn't fire if both other Publishers did.

So I opened

https://feedbackassistant.apple.com/feedback/6574579

@Philippe_Hausler

@Philippe_Hausler are you allowed to comment on this issue? I don‘t have many uses in RxSwift of this operator but I have a few and they are considered critical, one of which manages firmware updates. I would like at some point migrate to Combine, but I struggle to come up with a way to achieve the same behavior through chaining reactive streams using the existing operators.