For await with combine publisher.values

Hi,

I have a situation where I am using Combine publisher and I have a async function that needs to be called every time a new value is published.

I publishing values 0, 10, 20, 30, 40 and 50

When I try it I get only 0, 10 and 50.

Question

  • Why am I not getting 0, 10, 20, 30, 40 and 50?
  • How can I fix it?
  • What is the correct way to use combine publisher with swift async function?

Environment:

  • Xcode 16.1 (16B40)
  • macOS Commandline project
  • macOS 15.1 (24B83)

Code:

import Foundation
import Combine

actor Model {
    private let pricePublisher = CurrentValueSubject<Int, Never>(0)
    
    func simulatePriceChanges() async {
        print("simulatePriceChanges - start")
        pricePublisher.send(10)
        pricePublisher.send(20)
        pricePublisher.send(30)
        pricePublisher.send(40)
        pricePublisher.send(50)
        print("simulatePriceChanges - end")
    }
    
    func observePriceChanges() async {
        print("observePriceChanges - start")
        for await price in pricePublisher.values {
            await processPrice(price)
        }
        print("observePriceChanges - end")
    }
    
    private func processPrice(_ price: Int) async {
        print("processed price: \(price)")
    }
}

let model = Model()

Task {
    await model.observePriceChanges()
}

Task {
    try? await Task.sleep(for: .seconds(2))
    await model.simulatePriceChanges()
}

RunLoop.main.run()

Actual Output:

observePriceChanges - start
processed price: 0
simulatePriceChanges - start
simulatePriceChanges - end
processed price: 10
processed price: 50

It's not well documented but AsyncPublisher, which is what values returns, does not buffer values at all. In a for await loop, you only receive values while it's (implicitly) awaiting for the result of next().

If the AsyncPublisher produces multiple values while the receiving end is processing the body of the for await loop, then the earlier values are lost (and so would be the latest value if the original source was unbuffered such as a PassthroughSubject).

If you want it to buffer, replace pricePublisher.values with pricePublisher.buffer(size: .max, prefetch: .byRequest, whenFull: .dropOldest).values. Or maybe a size less than .max if you can't control the upstream.

Rob Napier wrote a good blog post on it, and also commented on NotificationCenter.Notifications which further confuses things by buffering some values but not unbounded many.

1 Like

Be aware that .byRequest doesn't actually seem to work, and .buffer will request unlimited demand from its upstream, which could be catastrophic depending on exactly how the upstream behaves in response to such demand.

1 Like

Thanks a lot @pyrtsa and @KeithBauerANZ

It is good to know there is a way to buffer

@KeithBauerANZ
I am confused by the following:

Be aware that .byRequest doesn't actually seem to work, and .buffer will request unlimited demand from its upstream, which could be catastrophic depending on exactly how the upstream behaves in response to such demand

Questions:

  1. Is there a way to test via code that byRequest doesn't work (meaning in reality it prefetches without subscriber requesting for values)?
  2. By catastrophic do you mean when the buffer size is max the buffer would grow very large if the subscriber can't keep pace with the publisher?
  3. My understanding is that every iteration of that for await loop would request values. So once the async function processPrice completes it would request for a value. Is that correct?
  1. Sure, any upstream publisher that will print out or otherwise display the downstream demand will show this.
  2. Exactly, if upstream supply exceeds downstream capacity, the buffer will grow (to the bound). Particularly bad if "upstream supply" is something like an infinite sequence or drawing from a Collection that can produce extremely large numbers of elements extremely quickly.
  3. It's not quite clear to me what you're asking. AsyncSequences can only produce elements in response to calls to AsyncIteratorProtocol.next() calls. AsyncPublisher turns this into a Combine Demand.max(1), and returns the first element published by its upstream after that demand is established. Upstream publishers that don't respect that demand can publish when the AsyncPublisher isn't ready, and it simply drops those elements. That's why you need the buffer. Theoretically, the buffer's .byRequest mode would mirror that downstream demand upstream, which is what you want — you'd get the values published in response to the AsyncPublisher's demand from the for await loop, plus any values published without demand, but a well-behaved upstream still wouldn't ever see more than .max(1) demand, and wouldn't flood you. But instead, buffer always sends Demand.unlimited upstream, which will cause the upstream to supply values as fast as it can.
3 Likes

Thanks a lot for the clear explanation, please bear with me to understand better.

I am not sure why having a buffer would send Demand.unlimited wouldn't it make more sense for the buffer implementation to send Demand.max(<buffer size >)?

I am assuming even if it does Demand.unlimited as long as the buffer size is small or reasonable it would be fine only those elements are filled.

So is it fair to say as long as we set the buffer size to a reasonable size (whatever is suitable) it is fine to use the publisher buffer?

It's possible that buffer only sends .unlimited if the buffer itself is infinite, I don't know. You'd have to experiment. The right compromise might depend on the exact upstream publisher too (eg. a CombineLatest3 can supply up to 3 elements from a .max(1) demand, or a PassthroughSubject always ignores demand and can supply arbitrarily many elements).

1 Like

@somu , perhaps this could help you :

I made a Publisher extension to replace .values with a version that don't miss elements: .stream

1 Like

This is really nice & simple, and does solve some of the problems with buffer, but it still has 2 of the problems:

  • it creates an infinite buffer (easily solvable by passing in AsyncStream's buffering parameters)
  • it creates unlimited upstream demand
1 Like