Combine: what are those multicast functions for?

Struggling with some combine problems I came across the "Working with Multiple Subscribers" section in Apple Developer Documentation :

func multicast<S>(() -> S) -> Publishers.Multicast<Self, S>

func multicast<S>(subject: S) -> Publishers.Multicast<Self, S>

However, when I tried to confirm my assumption that multicast would be needed when sending to multiple subscribers, I found out this is not necessary when trying on this playground code (run on 10.14.5 in Xcode Version 11.0 beta 3 (11M362v)):

enum FormError: Error { }

let usernamePublisher = PassthroughSubject<String, FormError>()
let passwordPublisher = PassthroughSubject<String, FormError>()

let validatedCredentials = Publishers.CombineLatest(usernamePublisher, passwordPublisher)
    .map { (username, password) -> (String, String) in
        return (username, password)
    }
    .map { (username, password) -> Bool in
        !username.isEmpty && !password.isEmpty && password.count > 12
    }
    .eraseToAnyPublisher()

let firstSubscriber = validatedCredentials.sink { (valid) in
    print("First Subscriber: CombineLatest: Are the credentials valid: \(valid)")
}

let secondSubscriber = validatedCredentials.sink { (valid) in
    print("Second Subscriber: CombineLatest: Are the credentials valid: \(valid)")
}

// Nothing will be printed yet as `CombineLatest` requires both publishers to have send at least one value.
usernamePublisher.send("avanderlee")
passwordPublisher.send("weakpass")
passwordPublisher.send("verystrongpassword")

this prints:

First Subscriber: CombineLatest: Are the credentials valid: false
Second Subscriber: CombineLatest: Are the credentials valid: false
First Subscriber: CombineLatest: Are the credentials valid: true
Second Subscriber: CombineLatest: Are the credentials valid: true

So my question is: if multicast ist not needed when sending to multiple subscribers what is it for and how would I use it (example code welcome)?

Thanks,

Lars

3 Likes

Multicast is used if you want to control the starting point of the event emission.

It conforms to this protocol:
https://developer.apple.com/documentation/combine/connectablepublisher

This is similar to RxSwift‘s multicast operator:
http://reactivex.io/documentation/operators/publish.html

To give some context, multicast is used as a component to build up the .share() operator. In that case it used to connect an upstream to a PassthroughSubject and then is auto connected. Normally when a Subscriber receives a Subscription it will cancel any additional Subscriptions past the first, Multicast gives an escape hatch to this behavior and handles multiple Subscriptions.

@DevAndArtist, @Philippe_Hausler Please excuse my ignorance, but I am not only new to Combine but also to the whole reactive programming topic. And because Combine is also very new it is somewhat difficult to find any stuff related to this on the web. Also, even Apple's docs are merely bare bones.

So I still wonder if it is legal to connect more than one Subscriber to one Publisher or whether this Multicast feature is needed. In my experiment in the code above it seems to work without Multicast, but this could be a bug or quirk. I am asking because in some other code here: Crash in SwiftUI App using Combine (was: Using @Published in conjunction with @State in SwiftUI) - #5 by IOOI I have the problem, that some subsequent Publisher never fires. This Publisher, named validatedCredentials is connected to two other publishers, validatedEMail and validatedPassword like this:

class RegistrationModel : BindableObject {
    @Published var eMail: String = ""
    @Published var password: String = ""
    @Published var passwordRepeat: String = ""

    public var didChange = PassthroughSubject<Void, Never>()

    var validatedEMail: AnyPublisher<String?, Never> {
        return $eMail
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .removeDuplicates()
            .flatMap { username in
                return Future { promise in
                    self.usernameAvailable(username) { available in
                        promise(.success(available ? username : nil))
                    }
                }
        }
        .eraseToAnyPublisher()
    }

    var validatedPassword: AnyPublisher<String?, Never> {
        return Publishers.CombineLatest($password, $passwordRepeat)
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .map { password, passwordRepeat in
                guard password == passwordRepeat, password.count > 5 else { return nil }
                return password
        }
        .eraseToAnyPublisher()
    }

    var validatedCredentials: AnyPublisher<(String, String)?, Never> {
        return Publishers.CombineLatest(validatedEMail, validatedPassword)
            .map { validatedEMail, validatedPassword in
                guard let eMail = validatedEMail, let password = validatedPassword else { return nil }
                return (eMail, password)
        }
        .eraseToAnyPublisher()
    }


    func usernameAvailable(_ username: String, completion: (Bool) -> Void) {
        let isValidEMailAddress: Bool = NSPredicate(format:"SELF MATCHES %@", "[A-Z0-9a-z._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,64}").evaluate(with: username)

        completion(isValidEMailAddress)
    }
}

Not shown here but maybe important: validatedEMail and validatedPassword are also connected to some SwiftUI Text-Views at the same time (just for debugging purposes).

So my question is: would I need some Multicast-Magic here to make it work? If so, how would I use this ConnectablePublisher? Would I just add a .makeConnectable() after the .eraseToAnyPublisher() and change the method signature accordingly? What else would be needed?

Thanks a lot,

Lars

I don‘t see any issues here at a quick glance. In general you don‘t need multicast at all, it‘s rarely used, likely only in very critical use cases where the emission of values must be explicitly triggered by you instead of the automatic behavior during subscription.

That said, forget multicast, your code looks okay using combineLatest. Remember that this operator will wait until every parameter sequence emitted at least one value, otherwise it won‘t proceed.

Since you are new to reactive programming and Combine docs aren‘t very teaching I would recommend to read the behavior about reactive operators from other libraries like RxSwift, since Combine is heavily inspired by such libraries. To help you with that here is a cheat sheet that compares the operators and types between RxSwift and Combine: RxSwift to Apple’s Combine “Cheat Sheet” | by Shai Mishali | Gett Tech | Medium

One more thing. I don‘t recommend using flatMap operator because it‘s evil. It will break your neck at some point because of its non-intuitive behavior. flatMap may flatten some value like flatMap does in the stdlib, but in a reactive framework there is more going on behind the scenes.

Let‘s say you flatten an async and very long operation using flatMap, however if you want that operation to be canceled when the upstream emits a new value, well here comes the evil part. The flattened operation will run until it completes or fails. If you flattened an infinite stream (something that never completes), then flatMap will never complete and nor will your whole sequence ever complete. Furthermore flatMap does not guarantee any serialization. If you flattened A and B sequences, it‘s not guaranteed that A will emit first and then followed by B. The order B then A is valid as well. It‘s also an operator that will create a lot of hard to find retain cycles and leaked memory.

That said beginners should NOT use flatMap operator at all. To solve all the above issues RxSwift introduces flatMapLatest operator, which guarantees invalidation of previously flattened sequence and very easy to drive. If you also want to invalidate the previously flattened sequence and not emit any further values you can use Empty that completes immediately.

In Combine there is no single operator for flatMapLatest, you have to use map which returns a publisher not just its Output type, and then use switchToLatest which will the trick here.

On the App Store there is a RxMarbles app that contains a list of most RxSwift operators and lets you play with the marble diagrams to understand the reactive operations better.

3 Likes

Time for a Combine Marbles App.
I hope Apple will make a world class documentation for Combine as the concepts are new to many and - as you pointed out with flatMap - sometimes surprising even to those who know the basics of reactive programming.

9 Likes

I also filed a feedback report that Combine is missing a very critical operator called concatMap in other frameworks (cc @Philippe_Hausler) FB6186734.

In the report I showcased one problem that exists with flatMap as mentioned above. Here is a small playground snippet to test it yourself.

import PlaygroundSupport
import Combine

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let publisher = PassthroughSubject<Int, Never>()

let something = publisher
  .flatMap { value in
    Publishers.Sequence(sequence: [Int](repeating: value, count: value))
      .flatMap { value in
        Just(value).delay(
          for: .seconds(Double.random(in: 1 ... 4)),
          scheduler: RunLoop.main
        )
      }
  }
  .sink { value in
    print(value)
  }

Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)

DispatchQueue.main.asyncAfter(deadline: .now() + 1200) {
  something.cancel()
  page.finishExecution()
}

The order of now random because each flattened publisher emits at a different time. However with concatMap this would be different and the order would be always the same and predictable, as the operator don't allow parallel execution of flattened publishers and acts more like a queue that merges every incoming publisher one by one until each of them completes before taking the next one.

Here is how RxSwift handles concatMap:

The expected order for concatMap would be:

1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 6 6 6 6 6 6 7 7 7 7 7 7 7 8 8 8 8 8 8 8 8 9 9 9 9 9 9 9 9 9 
3 Likes

@IOOI and for other readers of this thread.

The most important takeaway from here:

  • Learn how sequences complete (super important), followed by how each operator behaves in terms of completion.
  • Learn how operators behave, not purely just by it's name
    • flatMap does more than stdlib collection operator in reactive frameworks.
    • combineLatest waits until each sequence emitted at least one value.
    • differences between zip and combineLatest
    • etc. etc.
  • How sequences fail and how to recover from an upstream failure.
  • New in Combine is the concept of back pressure (I have to learn it myself as well).

Marble diagrams are your friends.

7 Likes

I changed my code to:

var validatedEMail: AnyPublisher<String?, Never> {
    return $eMail
        .debounce(for: 0.5, scheduler: RunLoop.main)
        .removeDuplicates()
        .map { username in
            return Future { promise in
                self.usernameAvailable(username) { available in
                    promise(.success(available ? username : nil))
                }
            }
    }
    .switchToLatest()
        .eraseToAnyPublisher()
}

but this didn't help the issue: the validatedEMail Publisher still works as expected but the validatedCredentials Publisher still doesn't fire if both other Publishers did.

So I opened

https://feedbackassistant.apple.com/feedback/6574579

@Philippe_Hausler

@Philippe_Hausler are you allowed to comment on this issue? I don‘t have many uses in RxSwift of this operator but I have a few and they are considered critical, one of which manages firmware updates. I would like at some point migrate to Combine, but I struggle to come up with a way to achieve the same behavior through chaining reactive streams using the existing operators.

Xcode-beta 11 beta 4 came out some days ago. So I thought: new game, new chance and retried my code (after some small adaptions for the latest version of combine):

//
//  RegistrationView.swift
//  Combine-Beta-Feedback
//
//  Created by Lars Sonchocky-Helldorf on 09.07.19.
//  Copyright © 2019 Lars Sonchocky-Helldorf. All rights reserved.
//

import SwiftUI
import Combine

struct RegistrationView : View {
    @ObjectBinding var registrationModel = RegistrationModel()
    
    @State private var registrationButtonDisabled = true
    
    @State private var validatedEMail: String = ""
    @State private var validatedPassword: String = ""
    
    var body: some View {
        Form {
            Section {
                TextField("Enter your EMail", text: $registrationModel.eMail)
                SecureField("Enter a Password", text: $registrationModel.password)
                SecureField("Enter the Password again", text: $registrationModel.passwordRepeat)
                Button(action: registrationButtonAction) {
                    Text("Create Account")
                }
                .disabled($registrationButtonDisabled.value)
                    .onReceive(self.registrationModel.validatedCredentials) { newValidatedCredentials in
                        self.registrationButtonDisabled = (newValidatedCredentials == nil)
                }
            }
            
            Section {
                Text("Validated EMail: \(validatedEMail)")
                    .onReceive(self.registrationModel.validatedEMail) { newValidatedEMail in
                        self.validatedEMail = newValidatedEMail != nil ? newValidatedEMail! : "EMail invalid"
                }
                Text("Validated Password: \(validatedPassword)")
                    .onReceive(self.registrationModel.validatedPassword) { newValidatedPassword in
                        self.validatedPassword = newValidatedPassword != nil ? newValidatedPassword! : "Passwords to short or don't match"
                }
            }
        }
        .navigationBarTitle(Text("Sign Up"))
    }
    
    func registrationButtonAction() {
        
    }
}

class RegistrationModel : BindableObject {
    
    @Published var eMail: String = ""
    @Published var password: String = ""
    @Published var passwordRepeat: String = ""
    
    public var willChange = PassthroughSubject<Void, Never>()
    
    var validatedEMail: AnyPublisher<String?, Never> {
        return $eMail
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .removeDuplicates()
            .map { username in
                return Future { promise in
                    print("username: \(username)")
                    self.usernameAvailable(username) { available in
                        promise(.success(available ? username : nil))
                    }
                }
        }
        .switchToLatest()
            .eraseToAnyPublisher()
    }
    
    var validatedPassword: AnyPublisher<String?, Never> {
        return Publishers.CombineLatest($password, $passwordRepeat)
            .debounce(for: 0.5, scheduler: RunLoop.main)
            .map { password, passwordRepeat in
                print("password: \(password), passwordRepeat: \(passwordRepeat)")
                guard password == passwordRepeat, password.count > 5 else { return nil }
                return password
        }
        .eraseToAnyPublisher()
    }
    
    var validatedCredentials: AnyPublisher<(String, String)?, Never> {
        return Publishers.CombineLatest(validatedEMail, validatedPassword)
            .map { validatedEMail, validatedPassword in
                print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")
                guard let eMail = validatedEMail, let password = validatedPassword else { return nil }
                return (eMail, password)
        }
        .eraseToAnyPublisher()
    }
    
    
    func usernameAvailable(_ username: String, completion: (Bool) -> Void) {
        let isValidEMailAddress: Bool = NSPredicate(format:"SELF MATCHES %@", "[A-Z0-9a-z._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,64}").evaluate(with: username)
        
        completion(isValidEMailAddress)
    }
}

#if DEBUG
struct RegistrationView_Previews : PreviewProvider {
    static var previews: some View {
        RegistrationView()
    }
}
#endif

running this I now get:

username: 
username: 
password: , passwordRepeat: 
username: 
password: , passwordRepeat: 
username: 
username: 
username: test
username: test
username: test@
username: test@
username: test@test.com
username: test@test.com
password: , passwordRepeat: 
username: test@test.com
username: test@test.com
2019-07-22 16:54:36.302558+0200 Combine-Beta-Feedback[21023:3503157] [Assert] Error in UIKit client: -[UIWindow setScreen:] should not be called if the client adopts UIScene lifecycle. Call -[UIWindow setWindowScene:] instead.
2019-07-22 16:54:36.391008+0200 Combine-Beta-Feedback[21023:3503373] [AutoFill] Cannot show Automatic Strong Passwords for app bundleID: de.silpion.Combine-Beta-Feedback due to error: iCloud Keychain is disabled
2019-07-22 16:54:36.841545+0200 Combine-Beta-Feedback[21023:3503157] [Common] _BSMachError: port 7507; (os/kern) invalid capability (0x14) "Unable to insert COPY_SEND"
password: tester, passwordRepeat: 
2019-07-22 16:54:42.943416+0200 Combine-Beta-Feedback[21023:3503376] [AutoFill] Cannot show Automatic Strong Passwords for app bundleID: de.silpion.Combine-Beta-Feedback due to error: iCloud Keychain is disabled
2019-07-22 16:54:42.943549+0200 Combine-Beta-Feedback[21023:3503157] [Assert] Error in UIKit client: -[UIWindow setScreen:] should not be called if the client adopts UIScene lifecycle. Call -[UIWindow setWindowScene:] instead.
password: tester, passwordRepeat: tester
password: tester, passwordRepeat: tester
username: test@test.com
username: test@test.com

and that's it. The line:

print("validatedEMail: \(validatedEMail ?? "not set"), validatedPassword: \(validatedPassword ?? "not set")")

is never reached. Any Ideas? @Philippe_Hausler

Hi @DevAndArtist,

I've got that bug in my queue now and I think the behavior you're seeing is about the PassthroughSubject and not flatMap. PassthroughSubject will drop values if the downstream has not made any demand for them. Because flatMap with maxPublishers: .max(1) will only request one value at a time from upstream, and due to the delay, all values other than the first 1 are dropped.

This works (please forgive my slight reformatting as I adjusted this to fit into my test harness):

        let something = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher
            .flatMap(maxPublishers: .max(1)) { value in
                [Int].init(repeating: value, count: value).publisher
                    .flatMap { value in
                        Just(value)
                            .delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
                    }
            }
            .sink { value in
                print(value)
            }
                
        DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
            something.cancel()
        }
        RunLoop.main.run(until: Date() + 400)

or this:

        let subject = PassthroughSubject<Int, Never>()
        let something = subject
            .buffer(size: 9, prefetch: .byRequest, whenFull: .dropOldest)
            .flatMap(maxPublishers: .max(1)) { value in
                [Int].init(repeating: value, count: value).publisher
                    .flatMap { value in
                        Just(value)
                            .delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
                    }
            }
            .sink { value in
                print(value)
            }
        
        let c = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher.subscribe(subject)
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
            something.cancel()
        }
        RunLoop.main.run(until: Date() + 400)
        c.cancel()

I haven't yet checked the rest of the comparison to concatMap you put in the bug (thank you for all of that).

3 Likes

Hi @Tony_Parker, thank you for tackling this issue. I must admit that I'm not familiar with the concept of back pressure yet, it does not exist in context of RxSwift. Therefore my assumptions about the behavior of reactive streams might be slightly off then defined in Combine.

I eventually tried to build my own ConcatMap publisher, but I failed at several points:

  • Mine only operates on finite upstreams - this is totally wrong
  • Instead of switchToLatest, which does not work, I had to use flatMap again.

This example produces the expected output, but it's still not fully correct as just mentioned.

extension Publishers {
  public struct ConcatMap<NewPublisher, Upstream>:
    Publisher
    where
    NewPublisher: Publisher,
    Upstream: Publisher,
    NewPublisher.Failure == Upstream.Failure
  {
    public typealias Output = NewPublisher.Output
    public typealias Failure = Upstream.Failure
    
    public let upstream: Upstream
    public let transform: (Upstream.Output) -> NewPublisher
    
    public init(
      upstream: Upstream,
      transform: @escaping (Upstream.Output) -> NewPublisher
    ) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S)
      where
      S: Subscriber,
      NewPublisher.Output == S.Input,
      Upstream.Failure == S.Failure
    {
      let transform = self.transform
      let start = Empty<Output, Failure>(completeImmediately: true)
        .eraseToAnyPublisher()
      // This implementation is meh, because it requires `upstream` to be
      // finite and complete first before we can start appending.
      upstream
        .map { output -> AnyPublisher<Output, Failure> in
          transform(output).eraseToAnyPublisher()
        }
        .reduce(start) { result, publisher in
          result.append(publisher).eraseToAnyPublisher()
        }
        // I don't know why flat map is needed here.
        .flatMap { $0 }
        // I though `switchToLatest` like `flatMapLatest` from RxSwift is
        // what I need here to kick off everything, but it does not work.
//        .switchToLatest()
        .receive(subscriber: subscriber)
    }
  }
}

extension Publisher {
  public func concatMap<T, P>(
    _ transform: @escaping (Self.Output) -> P
  ) -> Publishers.ConcatMap<P, Self>
    where
    T == P.Output,
    P: Publisher,
    Self.Failure == P.Failure
  {
    return Publishers.ConcatMap(upstream: self, transform: transform)
  }
}

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

let publisher = PassthroughSubject<Int, Never>()

let something = publisher
  .concatMap { value in
    Publishers.Sequence(sequence: [Int](repeating: value, count: value))
      .flatMap { value in
        Just(value).delay(
          for: .seconds(Double.random(in: 1 ... 4)),
          scheduler: RunLoop.main
        )
      }
  }
  .sink { value in
    print(value)
  }

Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)

DispatchQueue.main.asyncAfter(deadline: .now() + 120) {
  something.cancel()
  page.finishExecution()
}

I think in case of concatMap we really shouldn't make any assumption about the upstream. I linked a few articles related to the operator, which also mention that concatMap isn't efficient in terms of async work, because if the upstream will fire rapidly but concatMap will process everything slower and preserve order and wait until each merged publisher completes, the processing time may grow very fast. The made up example above is just perfect for that scenario, but it's exactly what I expect from this operator.

In our app I only have two usages of concatMap. First, that does firmware updates in sequence, handles errors if any happen, retries if needed, then process any pending firmware updates. Second, we queue bluetooth events and transform those into UI events that are synced with a specific time interval. So if events arrive faster, they will be enqueued, then processed when needed by the interval tick.


This leads me to the question, does .buffer(size: ..., prefetch: .byRequest ... acts like a queue, or are all buffered elements always kept in the buffer?

I rewrote the ConcatMap operator using buffer and flatMap(maxPublishers:), but I don't know if Buffer will eventually explode if it's not acting like a FIFO queue.

extension Publishers {
  public struct ConcatMap<NewPublisher, Upstream>:
    Publisher
    where
    NewPublisher: Publisher,
    Upstream: Publisher,
    NewPublisher.Failure == Upstream.Failure
  {
    public typealias Output = NewPublisher.Output
    public typealias Failure = Upstream.Failure
    
    public let upstream: Upstream
    public let transform: (Upstream.Output) -> NewPublisher
    
    public init(
      upstream: Upstream,
      transform: @escaping (Upstream.Output) -> NewPublisher
    ) {
      self.upstream = upstream
      self.transform = transform
    }
    
    public func receive<S>(subscriber: S)
      where
      S: Subscriber,
      NewPublisher.Output == S.Input,
      Upstream.Failure == S.Failure
    {
      upstream
        // - Is this potentially a FIFO queue?
        // - I hope it uses a (dynamic) Array so it can grow and 
        //   shrink in size as needed.
        .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
        .flatMap(maxPublishers: .max(1), transform)
        .receive(subscriber: subscriber)
    }
  }
}
1 Like

Hi @DevAndArtist,

Sorry about the late replies, things are pretty crazy right now. :slight_smile:

Buffer has a few behaviors depending on its configuration. It should not cause a fatal error or abort (assuming that's what you mean by explode). It is possible to configure it to send an error if it fills up.

Here are the options:

    //       keepFull
    //           subscribe(subscription): starts with demand of buffer size
    //           afterwards will use upstream demand to try to keep the buffer as full as possible
    //       byRequest
    //           simply forwards downstreams demand requests to upstream

    public enum PrefetchStrategy {
        case keepFull
        case byRequest
    }

    public enum BufferingStrategy<Failure: Swift.Error> {
        case dropNewest
        case dropOldest
        case customError(() -> Failure)
    }

Here I took advantage of the fact that I knew that your upstream would only have a max of 9 elements and created a buffer to hold all 9. The Sequence Publisher will go ahead and send all 9 to the buffer, since it was created with a keepFull strategy. The flatMap then slowly drains it, one at a time.

1 Like

By 'slowly drains it' do you mean in a FIFO way or LIFO (if we choose dropNewest). So theoretically if flatMap drains the buffer fast enough it may never reach its max size (that‘s what I meant by explosion).

Does the buffer preallocate capacity? What happens if I set max buffer size to Int.max?


I just realized in my implementation above it probably should be dropOldest not dropNewest because I need FIFO.

Edit: wrong, it‘s for error handling, I guess I‘m already tired.

buffer only supports emitting the elements in the order received (at least for now). The drop strategy controls which elements are lost, not the order in which they are re-transmitted downstream. So we don't do LIFO, exactly.

You are correct that if flatMap drains the buffer fast enough, it will not reach its max size.

Right now (although this is an implementation detail and subject to change), buffer just stores the values as a plain old Swift array and checks the current size vs the max capacity specified. We may in the future preallocate space, so it may not be a great idea to ask for Int.max. Although, frankly, we'd probably have to special case that value anyway.

3 Likes

Okay gotcha, it would be great if we had an option where the buffer size is unbound and grows dynamically or ahead when needed (maybe some linked list style). I would expect such a behavior from a concatMap operator where the only limitation is how much hardware resources we can get, although I don‘t fully remember how RxSwift did that, they may also preallocate a buffer actually.

So theoretically I can build my own operator now which would behave like I‘d expect it to behave which is great, thank you for helping me figuring that out. You can either close the ticket or add a ConcatMap type to Combine now. :smirk::see_no_evil:

No problem, glad to help. I'm keeping the bug for now because I would like to gather some additional feedback from others before deciding if we want to add the more explicit name or just refer people to this pattern explicitly.

1 Like

Hi Tony,

Can you please expand the prefetch strategy keepFull? I can understand the initial demand equal to the buffer size, but don't understand what it means to "to use upstream demand to try to keep the buffer as full as possible". More specifically, I'm perplexed by the notion of "upstream demand", as I thought I understood demand to be a concept that applied to the downstream subscriber.

Cheers,
-Patrick

I agree the word upstream in that comment is confusing. Or maybe just wrong.

What keepFull does is to request values from upstream itself in order to attempt to hold N values in the buffer. In other words, when downstream comes along and takes values from the buffer, the buffer will request that number of values from upstream.

1 Like