After some experimenting today after seeing unexpected behavior I just realized that the throttled async stream behaves in a quite unexpected manner.
Basically the current semantics easily leads to lost Updates/wakeups.
AFAICT the current semantics are that if a new event comes in within the specified throttled time on the base async sequence, it’s not forwarded. But after the specified throttled time has elapsed- it’s not forwarded either until a new event comes from the base sequence.
So basically the guarantee is just that you don’t get more than X updates per time unit (and no more).
Reading the documentation a few times carefully and seeing the note that no task is needed it seems clear this is intentional, but I struggle to see when such semantics would be useful?
We’ve been doing a fair amount of throttling in other systems/contexts but we’d always had semantics that if there is a pending data item on a sequence we’d deliver it after the timer threshold has passed.
I see that the current implementation is efficient, but in what cases would it be useful?
Right now, with a throttle time of one second, if we would generate 1000 updates in one tenth of a second and then never update the sequence again, 999 updates would stay queued?
What would be a best practice with current async algorithms to have pending data delivered when the time have passed? Don’t others want such semantics?
To be clear, we use heavily bounded queues (in my example we had a queue length of one buffering newest on the base sequence and merged it with another similar sequence that is debiunced).
Am I missing anything? How are other ls using this in any robust way? What’s a good strategy for flushing the pending throttled events?
I appreciate that this is not quite what you had hoped for, but I do not think is quite as “unexpected” as you suggest. It is a fairly common throttle sort of pattern. E.g., RX does something very similar. See the throttle demonstration. I can’t recall a throttle implementation quite like how you have described it (though I have some faint memory of some Swift NIO proposal re an implementation with this sort of throttle/latest pattern).
Don't get me wrong: I can see the utility in the pattern you describe, but I just do not think this particular implementation is quite as unusual as you may have characterized it.
Yeah, if these are key events, then a traditional throttle might not be the right pattern.
Probably in those cases where you really have a continuous stream on your sequence. E.g., maybe for a constant stream of location updates where you periodically want to query a server.
Yeah, it’s probably not the right tool for a short-lived or inconsistent asynchronous sequence. It’s for a constant stream where you want to periodically pull one out and do something with it.
I find myself using debounce (e.g., for keyboard input) a lot more frequently. But that's obviously addressing a very different problem, despite its architectural similarities.
When I saw your post, this is the sort of thing I was thinking of. Using combineLatest of a timer sequence and your value sequence, and then a custom removeDuplicates predicate to filter out irrelevant updates.
Or you might just write your own sequence filter (perhaps some hybrid of combineLatest and throttle). The source is all right there, and it probably wouldn't be too bad. I just wouldn't call it “throttle”, to avoid confusing future devs.
I think we have to rework throttle do the following.
When the first call to next() on its iterator is made we have to spawn a child task which emits a value every specified interval
Whenever next() on its iterator is called we have to call next on the base sequence
a. We have to store the value and return it once the child task from 1. produces a value
b. We have to continue to call next on the base until we returned one value
This will make the implementation a lot more complicated but IMO it will result in the right behaviour cc @Philippe_Hausler
As far as I can tell, the semantics of a throttle operator in a stream is "discard all but the first event in a throttle time window". The concept described in the OP of "delaying events in bulk until the throttle time window ends" is not how RxSwift, Combine et cetera work: I'd say instead that this is the behavior a message queue, rather than a stream operator.
Oh, I think that is how every single throttle implementation I've seen on the server side has worked since the mid-90:s - in general, a throttle is a rate-limiting device that puts a ceiling on the maximum throughput, e.g. bandwidth throttling and has it's conceptual background from the throttle used in internal combustion engines.
The main point is that it restricts a flow, not discard items up until a time have passed. It's possible that RX/Combine have different semantics, but they then diverge from what is a fairly long established practice of what throttling is.
E.g. if you have bandwidth trottling to your ISP, I think you would find it unexpected that it drops all packets sent over the throttle limit in a given time window unless you send more data later? That is the current behaviour.
If you periodically want to pull one out and do something with it, one should probably use a timer which provides those semantics. Not sure I understand the 'inconsistent asynchronous sequence' argument - I'd argue that the only asynchronous streams that are not 'inconsistent' in that manner are those based of a timer source (or derived from one).
I must be missing something but I have a hard time seeing the current semantics that useful for anything that is not time-derived, which seems like an extremely narrow use case for a generic flow limiter such as throttle...
There is a problem with this style of behavior: if we preserve all emitted elements it means that a fast producer (one that should be throttled) will end up buffering those elements indefinitely. That buffer would then grow indefinitely.
That's not what I am suggesting. We should issue demand to the base sequence once we got demand in throttle. While we do have demand we should demand as long from the base until the interval passed and we can produce a value. Right now the latter part is missing because we do not return a value once the interval passed but only once both the interval passed and the base produced a subsequent value (which may never happen)
It is a bit unfortunate because we cannot spell this with structured Concurrency since we need to race the interval with the upstream similar to how we did it in debounce.
I think I agree with @hassila here. As I see it, it should just slow down delivery of events if they come too fast.
It should throw away any extra events, but ultimately one could choose to "reduce" those extra events in some way instead. (E.g. adding numbers, joining arrays.)
So I would expect this:
Send the first event immediately (it didn't come too fast!). Start the throttle window when it arrived.
If the next event comes after the throttle window has passed, send that immediately too, as that one did't come too fast either. Start the throttle window when that event arrived.
If the next event comes inside the throttle window, store it in the buffer-of-one
If more events come inside the throttle window, replace the contents of the buffer-of-one
When the throttle window ends, send the contents of the buffer-of-one (the last event sent in the throttle window). (Edit: If any event was sent here, start the throttle window now. This means that if events come in fast we will emit events at regular time intervals.)
This is very useful for e.g. UI, as we want events immediately if possible, but we don't want intermediate events if they come in too fast.
So to do that we would need the iteration of that upstream to be in its own task.
As you are describing it; that is a forward time measuring throttle; shape wise that belongs in the family of debounce. I guess the point I start to wonder is - are there really two things here? or is this just somehow a parent of debounce? and debounce is a specialization.
That would perhaps infer that the throttle shape (with looking backwards from elapsed time) is a different category of algorithm.
Your description is also how I understand the desired semantics of throttle to be. Throttle maximises the output rate (and minimises delay) w.r.t. the constraint of no more than one output per interval units of time.
I have captured the semantics in the following expected (but currently failing) test cases:
As you see, the output of throttle is fairly a stable once-in-3-steps stream of values, except where the input stream yields less frequently; contrast that with debounce which only yields values when the input is producing less frequently.
So how is throttle related to debounce? For the above inputs, debounce(for: .steps(3)) would behave like so, if I'm not mistaken:
There is one more design issue which I'd like to address, the defaulted argument latest: Bool = true. I absolutely agree that yielding the latest value is the correct default.
But to me, yielding the oldest value seems very contrived. When would you want to do that, and discard all information from the successive inputs?
Unless there is a good motivation otherwise, I'd rather omit the latest parameter altogether and only allow customisation in form of the more general reducing parameter for computing an aggregated value to yield.
Just want to chime in and say that @JJJ and @pyrtsa describes exactly the behaviour I would expect too and a +1 for removing latest and just have reducing (which could be added to @JJJ's description under point 4, as the ability to merge ('reduce') is key for throttling - it's there already, so latest doesn't really give anything).
I also don't quite see how debounce and throttle are that closely related, they are complementary in many cases (for UI doing a merge of one throttled and one debounced stream is often handy, where we e.g. can drive search input from the keyboard on a debounced stream, direct interaction clicks such as buttons on the throttled stream and then just merge them).