How to use Combine publisher with Swift concurrency? (Publisher.values could miss events)

The Publisher documentation explain that :

The property values exposes a publisher’s elements as an AsyncSequence , allowing you to iterate over them with for -await -in rather than attaching a Subscriber
What I understood when I read that is that values rocks and I must use it.

The problem is that using values property I miss some events in my application.
Let's try this code:

var subject = PassthroughSubject<Int, Never> ()
var j = 0
var k = 0

Task {
    for await value in subject.values {
        print ("await \(value)")
        j += 1
    }
}

subject.sink { value in
    print ("sink \(value)")
    k += 1
}


for i in 0...1000 {
    subject.send(i)
}

print ("> j \(j)")
print ("> k \(k)")

This print:

await 0
sink 0
...
sink 1000
await 1000
> j 924
> k 1001

The for-await loop miss some events.
My explanation is that as PassthroughSubject doesn't buffer anything, if an event is published during the for-await block, it is lost. Am I right ?

Instead of this, you can use AsyncStream which have an Int.max buffer size:

var subject = PassthroughSubject<Int, Never> ()
var k = 0
var l = 0

let stream = AsyncStream { continuation in
    let cancellable = subject.sink { continuation.yield($0) }
    continuation.onTermination = { continuation in
        cancellable.cancel()
    }
}

Task {
    for await value in stream {
        print ("await \(value)")
        l += 1
    }
}
subject.sink { value in
    print ("sink \(value)")
    k += 1
}


for i in 0...1000 {
    subject.send(i)
}

print ("> k \(k)")
print ("> l \(l)")

Which give the wanted result:

> k 1001
> l 1001

My question how values is implemented and why this property is the way recommended by the documentation when it is "dangerous" for those who do not know that we can lose events with it?

1 Like

You're only "losing" events due to a race condition in your code - you're starting to send events before you start listening for them, as your listener Task spawns & runs asynchronously and may not begin running until after you start your generator loop on the main thread.

Using AsyncStream to buffer the events is working around that by installing a listener (the AsyncStream) strictly before your start generating events. You don't necessarily need to do that in real-world code, depending on how much time you have before events start generating (typically "enough" in GUI applications) or if you have other means of ensuring your listener is ready before events are generated.

The behaviour is the same as for using sink on your subject - if you don't install that sink before generating events, you won't see those events there either. It's just that in your example code you're synchronously installing the sink before generating events, so it's always ready.

Unfortunately there's no great way (that I'm aware of) to know that your listener is ready (i.e. has actually started asynchronously iterating subject.values).

1 Like

Thanks for your reply @wadetregaskis .

You are right about the possible race condition due to a possible delay before for await loop was installed. But I'm afraid it's not the problem here because as you can see in the result there is await 0 printed.

Some also tell me that it might be a race condition in j += 1.

To avoid all this problems, I made another version and I still notice that with for await value in subject.values some events are missing.

actor CounterActor {
    var value: Int
    
    init(_ initialValue: Int) {
        value = initialValue
    }
    
    func increase() {
        value += 1
    }
}

var subject = PassthroughSubject<Int, Never> ()
var cancellable: AnyCancellable?
var j = CounterActor(0)
var k = CounterActor(0)

Task {
    for await value in subject.values {
        print ("await \(value)")
        await j.increase()
    }
}

cancellable = subject.sink { value in
    print ("sink \(value)")
    Task {
        await k.increase()
    }
}

Task {
    //wait to be sure for await is ready
    try? await Task.sleep(for: .seconds(1.0))
    for i in 0...1000 {
        subject.send(i)
    }
    
    //wait to be sure last send have been dispatched
    try? await Task.sleep(for: .seconds(1.0))
    print ("> j \(await j.value)")
    print ("> k \(await k.value)")
}

This still print:

> j 944
> k 1001
1 Like

Yeah, looks like Combine's provided AsyncSequence is buggy. When I run your code it prints await 0 and never anything more from that task. If I force a 1ms sleep in between sends into the publisher, then it works as expected.

You could file a bug report with Apple, but in any case it looks like you'll have to reimplement the bridge to Structured Concurrency by having your sink closure push values into an AsyncStream. e.g.:

import Combine

var subject = PassthroughSubject<Int, Never> ()
var cancellable: AnyCancellable?
var j = 0
var k = 0

let stream = AsyncStream { continuation in
    cancellable = subject.sink { _ in
        continuation.finish()
    } receiveValue: {
        print ("sink \($0)")
        k += 1
        continuation.yield($0)
    }
}

let reader = Task {
    for await value in stream {
        print ("await \(value)")
        j += 1
    }

    return j
}

let writer = Task {
    var sent = 0

    for i in 0...1000 {
        subject.send(i)
        sent += 1
    }

    subject.send(completion: .finished)

    return sent
}

print("Sent \(await writer.value), received \(await reader.value) via async sequence (\(k) via sink).")

I filled a bug report in Feedback assistant: FB13156448

Did you happen to receive a response from Apple?
I'm experiencing the same issue as you.
(The link you added doesn't work.)

No news for the moment ...

In the meanwhile, I use this:


extension Publisher where Failure == Never {
    public var stream: AsyncStream<Output> {
        AsyncStream { continuation in
            let cancellable = self.sink { completion in
                continuation.finish()
            } receiveValue: { value in
                 continuation.yield(value)
            }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}


extension Publisher where Failure: Error {
    public var stream: AsyncThrowingStream<Output, Error> {
        AsyncThrowingStream<Output, Error> { continuation in
            let cancellable = self.sink { completion in
                switch completion {
                case .finished:
                    continuation.finish()
                case .failure(let error):
                    continuation.finish(throwing: error)
                }
            } receiveValue: { value in
                 continuation.yield(value)
            }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}
4 Likes

That won't work with complete checks or Swift 6.

On line cancellable.cancel():

Capture of 'cancellable' with non-sendable type 'AnyCancellable' in a @Sendable closure

I'm trying to somehow make observing possible with checks and sendable, but it just won't work for me

My last version using @preconcurrency on Combine import :

import Foundation
@preconcurrency import Combine


extension Publisher where Failure == Never {
    public var stream: AsyncStream<Output> {
        AsyncStream { continuation in
            let cancellable = self.sink { completion in
                continuation.finish()
            } receiveValue: { value in
                continuation.yield(value)
            }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}


extension Publisher where Failure: Error {
    public var stream: AsyncThrowingStream<Output, Error> {
        AsyncThrowingStream<Output, Error> { continuation in
            let cancellable = self.sink { completion in
                switch completion {
                case .finished:
                    continuation.finish()
                case .failure(let error):
                    continuation.finish(throwing: error)
                }
            } receiveValue: { value in
                continuation.yield(value)
            }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}

1 Like