I've been doing some experimenting and found this to be the case. I would like to confirm something though. I notice it starts by requesting a demand equal to the size of the buffer. However, I also observed that it requests additional demand based on requests for demand received from the downstream subscriber. Is this indeed the case? Will this additional demand be equal to the downstream demand?
I will double check, but the idea here is that if the downstream can hold N values and the buffer has size M open, and the strategy is
keepFull, then we should demand N + M from upstream so as to satisfy the downstream plus keep the buffer full.
flatMapdoes not guarantee any serialization. If you flattened A and B sequences, it‘s not guaranteed that A will emit first and then followed by B. The order B then A is valid as well. It‘s also an operator that will create a lot of hard to find retain cycles and leaked memory.
Can you please clarify what you disagree with? It‘s hard to guess if you just throw a big chunk of code at me. I showed an example upthread that the general
flatMap won‘t guarantee serialization.
I disagree with the exact sentence ,,, flatMap has a parameter maxPublishers with default value Subscribers.Demand.unlimited Just set it to Subsribers.Demand.max(1) and you have perfect serialization. Check the link and see the examples there.
I think he disagrees with the claim that “
flatMap does not guarantee any serialization”.
Apparently (judging from that stack overflow answer) if you call
flatMap only passes
.max(1) demand upstream, so it will only get a single “inner” publisher to subscribe to. Then
flatMap waits for that inner publisher to finish before passing another
.max(1) demand upstream. So
flatMap(maxPublishers: .max(1)) is only ever subscribed to one inner publisher at a time. It therefore concatenates the sequences of values from the inner publishers, instead of intermingling them.
This is a really useful insight that is not at all obvious from the documentation. I look forward to the day Combine has complete documentation specifying precisely what all its many operators actually do.
That is what we are looking for too. Almost everything we know about is based on our own "try and see". That's really annoying. Let say, I don't have any idea how multicast works ...
I still don‘t understand. Have you read the entire thread though? I said that I was not familiar with the concept of back pressure which Combine‘s
flatMap operator is build on top of. A general
flatMap which is equally the
flatMap with an unlimited demand does not guarantee serialization. This was my whole point. I have years of experience with RxSwift and many assumptions about the operators from Combine are derived from there for me.
It‘s okay to diagree, but you could have done it in a more understandable way.
Take it easy. This discussion is about Apple's Combine framework. I don' like to argue with you, I like to share my own experimentally issued knowledge which could help the others.
I‘m cool, but you started disagreeing with something I wrote without much context. To me this contra-productive and does not help understanding your point. ;) Next time I highly encourage you to be more specific.
There are three things to distinguish:
Subject is already a class so we get reference semantics. Thus a Subject is already a multicaster and there is no need to do anything else.
Multicast takes advantage of that fact; it takes as its parameter a function that produces a Subject and holds on to that, thus allowing us to multicast to any subscribers. It is a ConnectablePublisher so the upstream won't actually start publishing until you say
Share is a convenience wrapper for Multicast; it appends autoconnect so the upstream starts publishing as soon as we are subscribed to.
I now have a free online tutorial with examples that may help clarify:
As for buffer, I can trace what it is doing and I have documented it:
but I find it difficult to believe that what it is doing is what is intended. There are two things that I find particularly strange:
The docs say that
.byRequestis "A strategy that avoids prefetching and instead performs requests on demand." That is not true. On the contrary, prefetching is exactly what it does. In fact, it over prefetches. It immediately fills the buffer and then starts throwing values away.
.keepFulldoesn't just fill the buffer, it deliberately overflows the buffer and throws a value away. For example, if the buffer is full and the downstream requests a value, so that now there is a space in the buffer, the buffer requests two values from upstream — one to go into the buffer, and one to throw away.
So I regard buffer as useless because it is badly behaved.
Thanks for following up. Here is some info on the design (ignoring bugs for a second):
byRequest makes an
unlimited upstream request once, when it receives upstream subscription .
I don't think it's quite right to say that
byRequest "over" prefetches. It does attempt to fill its buffer by immediately requesting unlimited. That is why the other enum,
BufferingStrategy only makes sense with
dropNewest we drop a received value if the buffer is full.
dropOldest we drop the first received value (oldest) in the buffer.
If there are immediately unlimited values available from upstream (or at least, more than the size of the buffer), then - yes - it throws values away according to the strategy. However, if the upstream is producing values at a lower rate than downstream is consuming them,
byRequest should not be throwing anything away. If there is some burst of activity from upstream that temporarily puts the rate of receipt above the downstream usage, then buffer + byRequest should be a way to smooth that out.
I'm willing to admit that it's possible the name doesn't make this behavior clear.
keepFull makes upstream requests in 2 places:
- upon receiving subscription from upstream, when it requests
- after sending values to downstream (upon request from downstream), it requests from upstream equal to the number of elements sent
If we are somehow doubling the request and throwing one away in step 2, that does seem like a bug to me. I'll attempt to repro it using our test harness and see what we can figure out.
Hi Tony, great to hear from you! Really appreciate your work, and I thought your WWDC 2019 presentation of this stuff was superb. There are better and worse videos and yours was terrific.
The docs don't suggest that the buffering strategy (drop newest / oldest or throw) goes only with the
byRequest prefetching strategy. The buffering strategy does in fact change the result no matter which prefetching strategy you use, and you do have to specify it in either case.
byRequest on face value would seem to mean something like "upon request" or "whenever requested", but that, as you say, is not what happens; instead, there's an unlimited demand at the outset. Perhaps a name like
unlimitedImmediately would be better.
There is nothing in the buffer documentation that suggests that it is meant for a case where upstream production rate is slower than downstream consumption. I don't quite see why you'd need to buffer a production that is slower than you can consume. In my test example I've got it the other way round.
I can definitely prove that your
keepFull step 2 "it requests from upstream equal to the number of elements sent" is not what actually happens. If you look at the page I linked to, I provide a simple test example. Both using the final output of the pipeline (reading it off with
sink, for instance) and instrumenting the code with
.max(1) and receives it, the buffer turns around and asks for a
.max(2) from the upstream. That is what makes no sense to me; the buffer knows perfectly well that it has just handed off one value from a full buffer, so there is only one space open; so it seems perverse for it to ask for two values, knowing full well that the fate of either the second one or of the first item in the buffer (depending on the buffering strategy) will be to be thrown away.