It's not well documented but AsyncPublisher, which is what values returns, does not buffer values at all. In a for await loop, you only receive values while it's (implicitly) awaiting for the result of next().
If the AsyncPublisher produces multiple values while the receiving end is processing the body of the for await loop, then the earlier values are lost (and so would be the latest value if the original source was unbuffered such as a PassthroughSubject).
If you want it to buffer, replace pricePublisher.values with pricePublisher.buffer(size: .max, prefetch: .byRequest, whenFull: .dropOldest).values. Or maybe a size less than .max if you can't control the upstream.
Be aware that .byRequest doesn't actually seem to work, and .buffer will request unlimited demand from its upstream, which could be catastrophic depending on exactly how the upstream behaves in response to such demand.
Be aware that .byRequest doesn't actually seem to work, and .buffer will request unlimited demand from its upstream, which could be catastrophic depending on exactly how the upstream behaves in response to such demand
Questions:
Is there a way to test via code that byRequest doesn't work (meaning in reality it prefetches without subscriber requesting for values)?
By catastrophic do you mean when the buffer size is max the buffer would grow very large if the subscriber can't keep pace with the publisher?
My understanding is that every iteration of that for await loop would request values. So once the async function processPrice completes it would request for a value. Is that correct?
Sure, any upstream publisher that will print out or otherwise display the downstream demand will show this.
Exactly, if upstream supply exceeds downstream capacity, the buffer will grow (to the bound). Particularly bad if "upstream supply" is something like an infinite sequence or drawing from a Collection that can produce extremely large numbers of elements extremely quickly.
It's not quite clear to me what you're asking. AsyncSequences can only produce elements in response to calls to AsyncIteratorProtocol.next() calls. AsyncPublisher turns this into a Combine Demand.max(1), and returns the first element published by its upstream after that demand is established. Upstream publishers that don't respect that demand can publish when the AsyncPublisher isn't ready, and it simply drops those elements. That's why you need the buffer. Theoretically, the buffer's .byRequest mode would mirror that downstream demand upstream, which is what you want — you'd get the values published in response to the AsyncPublisher's demand from the for await loop, plus any values published without demand, but a well-behaved upstream still wouldn't ever see more than .max(1) demand, and wouldn't flood you. But instead, buffer always sends Demand.unlimited upstream, which will cause the upstream to supply values as fast as it can.
Thanks a lot for the clear explanation, please bear with me to understand better.
I am not sure why having a buffer would send Demand.unlimited wouldn't it make more sense for the buffer implementation to send Demand.max(<buffer size >)?
I am assuming even if it does Demand.unlimited as long as the buffer size is small or reasonable it would be fine only those elements are filled.
So is it fair to say as long as we set the buffer size to a reasonable size (whatever is suitable) it is fine to use the publisher buffer?
It's possible that buffer only sends .unlimited if the buffer itself is infinite, I don't know. You'd have to experiment. The right compromise might depend on the exact upstream publisher too (eg. a CombineLatest3 can supply up to 3 elements from a .max(1) demand, or a PassthroughSubject always ignores demand and can supply arbitrarily many elements).