Combine: what are those multicast functions for?

I also filed a feedback report that Combine is missing a very critical operator called concatMap in other frameworks (cc @Philippe_Hausler) FB6186734.

In the report I showcased one problem that exists with flatMap as mentioned above. Here is a small playground snippet to test it yourself.

import PlaygroundSupport
import Combine

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let publisher = PassthroughSubject<Int, Never>()

let something = publisher
  .flatMap { value in
    Publishers.Sequence(sequence: [Int](repeating: value, count: value))
      .flatMap { value in
        Just(value).delay(
          for: .seconds(Double.random(in: 1 ... 4)),
          scheduler: RunLoop.main
        )
      }
  }
  .sink { value in
    print(value)
  }

Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)

DispatchQueue.main.asyncAfter(deadline: .now() + 1200) {
  something.cancel()
  page.finishExecution()
}

The order of now random because each flattened publisher emits at a different time. However with concatMap this would be different and the order would be always the same and predictable, as the operator don't allow parallel execution of flattened publishers and acts more like a queue that merges every incoming publisher one by one until each of them completes before taking the next one.

Here is how RxSwift handles concatMap:

The expected order for concatMap would be:

1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 6 6 6 6 6 6 7 7 7 7 7 7 7 8 8 8 8 8 8 8 8 9 9 9 9 9 9 9 9 9 
3 Likes

@IOOI and for other readers of this thread.

The most important takeaway from here:

  • Learn how sequences complete (super important), followed by how each operator behaves in terms of completion.
  • Learn how operators behave, not purely just by it's name
    • flatMap does more than stdlib collection operator in reactive frameworks.
    • combineLatest waits until each sequence emitted at least one value.
    • differences between zip and combineLatest
    • etc. etc.
  • How sequences fail and how to recover from an upstream failure.
  • New in Combine is the concept of back pressure (I have to learn it myself as well).

Marble diagrams are your friends.

7 Likes

I changed my code to:

var validatedEMail: AnyPublisher<String?, Never> {
    return $eMail
        .debounce(for: 0.5, scheduler: RunLoop.main)
        .removeDuplicates()
        .map { username in
            return Future { promise in
                self.usernameAvailable(username) { available in
                    promise(.success(available ? username : nil))
                }
            }
    }
    .switchToLatest()
        .eraseToAnyPublisher()
}

but this didn't help the issue: the validatedEMail Publisher still works as expected but the validatedCredentials Publisher still doesn't fire if both other Publishers did.

So I opened

https://feedbackassistant.apple.com/feedback/6574579

@Philippe_Hausler

@Philippe_Hausler are you allowed to comment on this issue? I don‘t have many uses in RxSwift of this operator but I have a few and they are considered critical, one of which manages firmware updates. I would like at some point migrate to Combine, but I struggle to come up with a way to achieve the same behavior through chaining reactive streams using the existing operators.

Xcode-beta 11 beta 4 came out some days ago. So I thought: new game, new chance and retried my code (after some small adaptions for the latest version of combine):

//
//  RegistrationView.swift
//  Combine-Beta-Feedback
//
//  Created by Lars Sonchocky-Helldorf on 09.07.19.
//  Copyright © 2019 Lars Sonchocky-Helldorf. All rights reserved.
//

import SwiftUI
import Combine

struct RegistrationView : View {
    @ObjectBinding var registrationModel = RegistrationModel()
    
    @State private var registrationButtonDisabled = true
    
    @State private var validatedEMail: String = ""
    @State private var validatedPassword: String = ""
    
    var body: some View {
        Form {
            Section {
                TextField("Enter your EMail", text: $registrationModel.eMail)
                SecureField("Enter a Password", text: $registrationModel.password)
                SecureField("Enter the Password again", text: $registrationModel.passwordRepeat)
                Button(action: registrationButtonAction) {
                    Text("Create Account")
                }
                .disabled($registrationButtonDisabled.value)
                    .onReceive(self.registrationModel.validatedCredentials) { newValidatedCredentials in
                        self.registrationButtonDisabled = (newValidatedCredentials == nil)
                }
            }
            
            Section {
                Text("Validated EMail: \(validatedEMail)")
                    .onReceive(self.registrationModel.validatedEMail) { newValidatedEMail in
                        self.validatedEMail = newValidatedEMail != nil ? newValidatedEMail! : "EMail invalid"
                }
                Text("Validated Password: \(validatedPassword)")
                    .onReceive(self.registrationModel.validatedPassword) { newValidatedPassword in
                        self.validatedPassword = newValidatedPassword != nil ? newValidatedPassword! : "Passwords to short or don't match"
                }
            }
        }
        .navigationBarTitle(Text("Sign Up"))
    }
    
    func registrationButtonAction() {
        
    }
}

class RegistrationModel : BindableObject {
    
    @Published var eMail: String = ""
    @Published var password: String = ""
    @Published var passwordRepeat: String = ""
    
    public var willChange = PassthroughSubject<Void, Never>()
    
    var validatedEMail: AnyPublisher<String?, Never> {
        return $eMail
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .removeDuplicates()
            .map { username in
                return Future { promise in
                    print("username: \(username)")
                    self.usernameAvailable(username) { available in
                        promise(.success(available ? username : nil))
                    }
                }
        }
        .switchToLatest()
            .eraseToAnyPublisher()
    }
    
    var validatedPassword: AnyPublisher<String?, Never> {
        return Publishers.CombineLatest($password, $passwordRepeat)
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .map { password, passwordRepeat in
                print("password: \(password), passwordRepeat: \(passwordRepeat)")
                guard password == passwordRepeat, password.count > 5 else { return nil }
                return password
        }
        .eraseToAnyPublisher()
    }
    
    var validatedCredentials: AnyPublisher<(String, String)?, Never> {
        return Publishers.CombineLatest(validatedEMail, validatedPassword)
            .map { validatedEMail, validatedPassword in
                print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")
                guard let eMail = validatedEMail, let password = validatedPassword else { return nil }
                return (eMail, password)
        }
        .eraseToAnyPublisher()
    }
    
    
    func usernameAvailable(_ username: String, completion: (Bool) -> Void) {
        let isValidEMailAddress: Bool = NSPredicate(format:"SELF MATCHES %@", "[A-Z0-9a-z._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,64}").evaluate(with: username)
        
        completion(isValidEMailAddress)
    }
}

#if DEBUG
struct RegistrationView_Previews : PreviewProvider {
    static var previews: some View {
        RegistrationView()
    }
}
#endif

running this I now get:

username: 
username: 
password: , passwordRepeat: 
username: 
password: , passwordRepeat: 
username: 
username: 
username: test
username: test
username: test@
username: test@
username: test@test.com
username: test@test.com
password: , passwordRepeat: 
username: test@test.com
username: test@test.com
2019-07-22 16:54:36.302558+0200 Combine-Beta-Feedback[21023:3503157] [Assert] Error in UIKit client: -[UIWindow setScreen:] should not be called if the client adopts UIScene lifecycle. Call -[UIWindow setWindowScene:] instead.
2019-07-22 16:54:36.391008+0200 Combine-Beta-Feedback[21023:3503373] [AutoFill] Cannot show Automatic Strong Passwords for app bundleID: de.silpion.Combine-Beta-Feedback due to error: iCloud Keychain is disabled
2019-07-22 16:54:36.841545+0200 Combine-Beta-Feedback[21023:3503157] [Common] _BSMachError: port 7507; (os/kern) invalid capability (0x14) "Unable to insert COPY_SEND"
password: tester, passwordRepeat: 
2019-07-22 16:54:42.943416+0200 Combine-Beta-Feedback[21023:3503376] [AutoFill] Cannot show Automatic Strong Passwords for app bundleID: de.silpion.Combine-Beta-Feedback due to error: iCloud Keychain is disabled
2019-07-22 16:54:42.943549+0200 Combine-Beta-Feedback[21023:3503157] [Assert] Error in UIKit client: -[UIWindow setScreen:] should not be called if the client adopts UIScene lifecycle. Call -[UIWindow setWindowScene:] instead.
password: tester, passwordRepeat: tester
password: tester, passwordRepeat: tester
username: test@test.com
username: test@test.com

and that's it. The line:

print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")

is never reached. Any Ideas? @Philippe_Hausler

Hi @DevAndArtist,

I've got that bug in my queue now and I think the behavior you're seeing is about the PassthroughSubject and not flatMap. PassthroughSubject will drop values if the downstream has not made any demand for them. Because flatMap with maxPublishers: .max(1) will only request one value at a time from upstream, and due to the delay, all values other than the first 1 are dropped.

This works (please forgive my slight reformatting as I adjusted this to fit into my test harness):

        let something = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher
            .flatMap(maxPublishers: .max(1)) { value in
                [Int].init(repeating: value, count: value).publisher
                    .flatMap { value in
                        Just(value)
                            .delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
                    }
            }
            .sink { value in
                print(value)
            }
                
        DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
            something.cancel()
        }
        RunLoop.main.run(until: Date() + 400)

or this:

        let subject = PassthroughSubject<Int, Never>()
        let something = subject
            .buffer(size: 9, prefetch: .byRequest, whenFull: .dropOldest)
            .flatMap(maxPublishers: .max(1)) { value in
                [Int].init(repeating: value, count: value).publisher
                    .flatMap { value in
                        Just(value)
                            .delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
                    }
            }
            .sink { value in
                print(value)
            }
        
        let c = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher.subscribe(subject)
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
            something.cancel()
        }
        RunLoop.main.run(until: Date() + 400)
        c.cancel()

I haven't yet checked the rest of the comparison to concatMap you put in the bug (thank you for all of that).

2 Likes

Hi @Tony_Parker, thank you for tackling this issue. I must admit that I'm not familiar with the concept of back pressure yet, it does not exist in context of RxSwift. Therefore my assumptions about the behavior of reactive streams might be slightly off then defined in Combine.

I eventually tried to build my own ConcatMap publisher, but I failed at several points:

  • Mine only operates on finite upstreams - this is totally wrong
  • Instead of switchToLatest, which does not work, I had to use flatMap again.

This example produces the expected output, but it's still not fully correct as just mentioned.

extension Publishers {
  public struct ConcatMap<NewPublisher, Upstream>:
    Publisher
    where
    NewPublisher: Publisher,
    Upstream: Publisher,
    NewPublisher.Failure == Upstream.Failure
  {
    public typealias Output = NewPublisher.Output
    public typealias Failure = Upstream.Failure
    
    public let upstream: Upstream
    public let transform: (Upstream.Output) -> NewPublisher
    
    public init(
      upstream: Upstream,
      transform: @escaping (Upstream.Output) -> NewPublisher
    ) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S)
      where
      S: Subscriber,
      NewPublisher.Output == S.Input,
      Upstream.Failure == S.Failure
    {
      let transform = self.transform
      let start = Empty<Output, Failure>(completeImmediately: true)
        .eraseToAnyPublisher()
      // This implementation is meh, because it requires `upstream` to be
      // finite and complete first before we can start appending.
      upstream
        .map { output -> AnyPublisher<Output, Failure> in
          transform(output).eraseToAnyPublisher()
        }
        .reduce(start) { result, publisher in
          result.append(publisher).eraseToAnyPublisher()
        }
        // I don't know why flat map is needed here.
        .flatMap { $0 }
        // I though `switchToLatest` like `flatMapLatest` from RxSwift is
        // what I need here to kick off everything, but it does not work.
//        .switchToLatest()
        .receive(subscriber: subscriber)
    }
  }
}

extension Publisher {
  public func concatMap<T, P>(
    _ transform: @escaping (Self.Output) -> P
  ) -> Publishers.ConcatMap<P, Self>
    where
    T == P.Output,
    P: Publisher,
    Self.Failure == P.Failure
  {
    return Publishers.ConcatMap(upstream: self, transform: transform)
  }
}

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let publisher = PassthroughSubject<Int, Never>()

let something = publisher
  .concatMap { value in
    Publishers.Sequence(sequence: [Int](repeating: value, count: value))
      .flatMap { value in
        Just(value).delay(
          for: .seconds(Double.random(in: 1 ... 4)),
          scheduler: RunLoop.main
        )
      }
  }
  .sink { value in
    print(value)
  }

Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)

DispatchQueue.main.asyncAfter(deadline: .now() + 120) {
  something.cancel()
  page.finishExecution()
}

I think in case of concatMap we really shouldn't make any assumption about the upstream. I linked a few articles related to the operator, which also mention that concatMap isn't efficient in terms of async work, because if the upstream will fire rapidly but concatMap will process everything slower and preserve order and wait until each merged publisher completes, the processing time may grow very fast. The made up example above is just perfect for that scenario, but it's exactly what I expect from this operator.

In our app I only have two usages of concatMap. First, that does firmware updates in sequence, handles errors if any happen, retries if needed, then process any pending firmware updates. Second, we queue bluetooth events and transform those into UI events that are synced with a specific time interval. So if events arrive faster, they will be enqueued, then processed when needed by the interval tick.


This leads me to the question, does .buffer(size: ..., prefetch: .byRequest ... acts like a queue, or are all buffered elements always kept in the buffer?

I rewrote the ConcatMap operator using buffer and flatMap(maxPublishers:), but I don't know if Buffer will eventually explode if it's not acting like a FIFO queue.

extension Publishers {
  public struct ConcatMap<NewPublisher, Upstream>:
    Publisher
    where
    NewPublisher: Publisher,
    Upstream: Publisher,
    NewPublisher.Failure == Upstream.Failure
  {
    public typealias Output = NewPublisher.Output
    public typealias Failure = Upstream.Failure
    
    public let upstream: Upstream
    public let transform: (Upstream.Output) -> NewPublisher
    
    public init(
      upstream: Upstream,
      transform: @escaping (Upstream.Output) -> NewPublisher
    ) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S)
      where
      S: Subscriber,
      NewPublisher.Output == S.Input,
      Upstream.Failure == S.Failure
    {
      upstream
        // - Is this potentially a FIFO queue?
        // - I hope it uses a (dynamic) Array so it can grow and 
        //   shrink in size as needed.
        .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
        .flatMap(maxPublishers: .max(1), transform)
        .receive(subscriber: subscriber)
    }
  }
}
1 Like

Hi @DevAndArtist,

Sorry about the late replies, things are pretty crazy right now. :slight_smile:

Buffer has a few behaviors depending on its configuration. It should not cause a fatal error or abort (assuming that's what you mean by explode). It is possible to configure it to send an error if it fills up.

Here are the options:

    //       keepFull
    //           subscribe(subscription): starts with demand of buffer size
    //           afterwards will use upstream demand to try to keep the buffer as full as possible
    //       byRequest
    //           simply forwards downstreams demand requests to upstream

    public enum PrefetchStrategy {
        case keepFull
        case byRequest
    }

    public enum BufferingStrategy<Failure: Swift.Error> {
        case dropNewest
        case dropOldest
        case customError(() -> Failure)
    }

Here I took advantage of the fact that I knew that your upstream would only have a max of 9 elements and created a buffer to hold all 9. The Sequence Publisher will go ahead and send all 9 to the buffer, since it was created with a keepFull strategy. The flatMap then slowly drains it, one at a time.

1 Like

By 'slowly drains it' do you mean in a FIFO way or LIFO (if we choose dropNewest). So theoretically if flatMap drains the buffer fast enough it may never reach its max size (that‘s what I meant by explosion).

Does the buffer preallocate capacity? What happens if I set max buffer size to Int.max?


I just realized in my implementation above it probably should be dropOldest not dropNewest because I need FIFO.

Edit: wrong, it‘s for error handling, I guess I‘m already tired.

buffer only supports emitting the elements in the order received (at least for now). The drop strategy controls which elements are lost, not the order in which they are re-transmitted downstream. So we don't do LIFO, exactly.

You are correct that if flatMap drains the buffer fast enough, it will not reach its max size.

Right now (although this is an implementation detail and subject to change), buffer just stores the values as a plain old Swift array and checks the current size vs the max capacity specified. We may in the future preallocate space, so it may not be a great idea to ask for Int.max. Although, frankly, we'd probably have to special case that value anyway.

3 Likes

Okay gotcha, it would be great if we had an option where the buffer size is unbound and grows dynamically or ahead when needed (maybe some linked list style). I would expect such a behavior from a concatMap operator where the only limitation is how much hardware resources we can get, although I don‘t fully remember how RxSwift did that, they may also preallocate a buffer actually.

So theoretically I can build my own operator now which would behave like I‘d expect it to behave which is great, thank you for helping me figuring that out. You can either close the ticket or add a ConcatMap type to Combine now. :smirk::see_no_evil:

No problem, glad to help. I'm keeping the bug for now because I would like to gather some additional feedback from others before deciding if we want to add the more explicit name or just refer people to this pattern explicitly.

1 Like

Hi Tony,

Can you please expand the prefetch strategy keepFull? I can understand the initial demand equal to the buffer size, but don't understand what it means to "to use upstream demand to try to keep the buffer as full as possible". More specifically, I'm perplexed by the notion of "upstream demand", as I thought I understood demand to be a concept that applied to the downstream subscriber.

Cheers,
-Patrick

I agree the word upstream in that comment is confusing. Or maybe just wrong.

What keepFull does is to request values from upstream itself in order to attempt to hold N values in the buffer. In other words, when downstream comes along and takes values from the buffer, the buffer will request that number of values from upstream.

1 Like

I've been doing some experimenting and found this to be the case. I would like to confirm something though. I notice it starts by requesting a demand equal to the size of the buffer. However, I also observed that it requests additional demand based on requests for demand received from the downstream subscriber. Is this indeed the case? Will this additional demand be equal to the downstream demand?

I will double check, but the idea here is that if the downstream can hold N values and the buffer has size M open, and the strategy is keepFull, then we should demand N + M from upstream so as to satisfy the downstream plus keep the buffer full.

1 Like

Furthermore flatMap does not guarantee any serialization. If you flattened A and B sequences, it‘s not guaranteed that A will emit first and then followed by B. The order B then A is valid as well. It‘s also an operator that will create a lot of hard to find retain cycles and leaked memory.

I disagree! Please see https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations/59889174#59889174

Can you please clarify what you disagree with? It‘s hard to guess if you just throw a big chunk of code at me. I showed an example upthread that the general flatMap won‘t guarantee serialization.

I disagree with the exact sentence ,,, flatMap has a parameter maxPublishers with default value Subscribers.Demand.unlimited Just set it to Subsribers.Demand.max(1) and you have perfect serialization. Check the link and see the examples there.

1 Like

I think he disagrees with the claim that “flatMap does not guarantee any serialization”.

Apparently (judging from that stack overflow answer) if you call flatMap(maxPublishers: .max(1)), flatMap only passes .max(1) demand upstream, so it will only get a single “inner” publisher to subscribe to. Then flatMap waits for that inner publisher to finish before passing another .max(1) demand upstream. So flatMap(maxPublishers: .max(1)) is only ever subscribed to one inner publisher at a time. It therefore concatenates the sequences of values from the inner publishers, instead of intermingling them.

This is a really useful insight that is not at all obvious from the documentation. I look forward to the day Combine has complete documentation specifying precisely what all its many operators actually do.

6 Likes