Combine: what are those multicast functions for?

I've been doing some experimenting and found this to be the case. I would like to confirm something though. I notice it starts by requesting a demand equal to the size of the buffer. However, I also observed that it requests additional demand based on requests for demand received from the downstream subscriber. Is this indeed the case? Will this additional demand be equal to the downstream demand?

I will double check, but the idea here is that if the downstream can hold N values and the buffer has size M open, and the strategy is keepFull, then we should demand N + M from upstream so as to satisfy the downstream plus keep the buffer full.

1 Like

Furthermore flatMap does not guarantee any serialization. If you flattened A and B sequences, it‘s not guaranteed that A will emit first and then followed by B. The order B then A is valid as well. It‘s also an operator that will create a lot of hard to find retain cycles and leaked memory.

I disagree! Please see https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations/59889174#59889174

Can you please clarify what you disagree with? It‘s hard to guess if you just throw a big chunk of code at me. I showed an example upthread that the general flatMap won‘t guarantee serialization.

I disagree with the exact sentence ,,, flatMap has a parameter maxPublishers with default value Subscribers.Demand.unlimited Just set it to Subsribers.Demand.max(1) and you have perfect serialization. Check the link and see the examples there.

1 Like

I think he disagrees with the claim that “flatMap does not guarantee any serialization”.

Apparently (judging from that stack overflow answer) if you call flatMap(maxPublishers: .max(1)), flatMap only passes .max(1) demand upstream, so it will only get a single “inner” publisher to subscribe to. Then flatMap waits for that inner publisher to finish before passing another .max(1) demand upstream. So flatMap(maxPublishers: .max(1)) is only ever subscribed to one inner publisher at a time. It therefore concatenates the sequences of values from the inner publishers, instead of intermingling them.

This is a really useful insight that is not at all obvious from the documentation. I look forward to the day Combine has complete documentation specifying precisely what all its many operators actually do.

6 Likes

That is what we are looking for too. Almost everything we know about is based on our own "try and see". That's really annoying. Let say, I don't have any idea how multicast works ...

1 Like

I still don‘t understand. Have you read the entire thread though? I said that I was not familiar with the concept of back pressure which Combine‘s flatMap operator is build on top of. A general flatMap which is equally the flatMap with an unlimited demand does not guarantee serialization. This was my whole point. I have years of experience with RxSwift and many assumptions about the operators from Combine are derived from there for me.

It‘s okay to diagree, but you could have done it in a more understandable way. :wink:

Take it easy. This discussion is about Apple's Combine framework. I don' like to argue with you, I like to share my own experimentally issued knowledge which could help the others.

I‘m cool, but you started disagreeing with something I wrote without much context. To me this contra-productive and does not help understanding your point. ;) Next time I highly encourage you to be more specific.

There are three things to distinguish:

  • Subject

  • Multicast

  • Share

Subject is already a class so we get reference semantics. Thus a Subject is already a multicaster and there is no need to do anything else.

Multicast takes advantage of that fact; it takes as its parameter a function that produces a Subject and holds on to that, thus allowing us to multicast to any subscribers. It is a ConnectablePublisher so the upstream won't actually start publishing until you say connect.

Share is a convenience wrapper for Multicast; it appends autoconnect so the upstream starts publishing as soon as we are subscribed to.

I now have a free online tutorial with examples that may help clarify:

https://www.apeth.com/UnderstandingCombine/operators/operatorsSplitters/operatorssplitters.html

https://www.apeth.com/UnderstandingCombine/operators/operatorsSplitters/operatorsshare.html

https://www.apeth.com/UnderstandingCombine/operators/operatorsSplitters/operatorsmulticast.html

2 Likes

As for buffer, I can trace what it is doing and I have documented it:

https://www.apeth.com/UnderstandingCombine/operators/operatorsAccumulators/operatorsbuffer.html

but I find it difficult to believe that what it is doing is what is intended. There are two things that I find particularly strange:

  • The docs say that .byRequest is "A strategy that avoids prefetching and instead performs requests on demand." That is not true. On the contrary, prefetching is exactly what it does. In fact, it over prefetches. It immediately fills the buffer and then starts throwing values away.

  • .keepFull doesn't just fill the buffer, it deliberately overflows the buffer and throws a value away. For example, if the buffer is full and the downstream requests a value, so that now there is a space in the buffer, the buffer requests two values from upstream — one to go into the buffer, and one to throw away.

So I regard buffer as useless because it is badly behaved.

3 Likes

Hi Matt,

Thanks for following up. Here is some info on the design (ignoring bugs for a second):

byRequest makes an unlimited upstream request once, when it receives upstream subscription .

I don't think it's quite right to say that byRequest "over" prefetches. It does attempt to fill its buffer by immediately requesting unlimited. That is why the other enum, BufferingStrategy only makes sense with byRequest.

If dropNewest we drop a received value if the buffer is full.
If dropOldest we drop the first received value (oldest) in the buffer.

If there are immediately unlimited values available from upstream (or at least, more than the size of the buffer), then - yes - it throws values away according to the strategy. However, if the upstream is producing values at a lower rate than downstream is consuming them, byRequest should not be throwing anything away. If there is some burst of activity from upstream that temporarily puts the rate of receipt above the downstream usage, then buffer + byRequest should be a way to smooth that out.

I'm willing to admit that it's possible the name doesn't make this behavior clear.


keepFull makes upstream requests in 2 places:

  1. upon receiving subscription from upstream, when it requests size elements
  2. after sending values to downstream (upon request from downstream), it requests from upstream equal to the number of elements sent

If we are somehow doubling the request and throwing one away in step 2, that does seem like a bug to me. I'll attempt to repro it using our test harness and see what we can figure out.

Hi Tony, great to hear from you! Really appreciate your work, and I thought your WWDC 2019 presentation of this stuff was superb. There are better and worse videos and yours was terrific.

The docs don't suggest that the buffering strategy (drop newest / oldest or throw) goes only with the byRequest prefetching strategy. The buffering strategy does in fact change the result no matter which prefetching strategy you use, and you do have to specify it in either case.

The name byRequest on face value would seem to mean something like "upon request" or "whenever requested", but that, as you say, is not what happens; instead, there's an unlimited demand at the outset. Perhaps a name like unlimitedImmediately would be better.

There is nothing in the buffer documentation that suggests that it is meant for a case where upstream production rate is slower than downstream consumption. I don't quite see why you'd need to buffer a production that is slower than you can consume. In my test example I've got it the other way round.

I can definitely prove that your keepFull step 2 "it requests from upstream equal to the number of elements sent" is not what actually happens. If you look at the page I linked to, I provide a simple test example. Both using the final output of the pipeline (reading it off with sink, for instance) and instrumenting the code with print, I can see clearly that when the downstream turns to the buffer and asks for a .max(1) and receives it, the buffer turns around and asks for a .max(2) from the upstream. That is what makes no sense to me; the buffer knows perfectly well that it has just handed off one value from a full buffer, so there is only one space open; so it seems perverse for it to ask for two values, knowing full well that the fate of either the second one or of the first item in the buffer (depending on the buffering strategy) will be to be thrown away.

Terms of Service

Privacy Policy

Cookie Policy