The property values exposes a publisher’s elements as an AsyncSequence , allowing you to iterate over them with for -await -in rather than attaching a Subscriber
What I understood when I read that is that values rocks and I must use it.
The problem is that using values property I miss some events in my application.
Let's try this code:
var subject = PassthroughSubject<Int, Never> ()
var j = 0
var k = 0
Task {
for await value in subject.values {
print ("await \(value)")
j += 1
}
}
subject.sink { value in
print ("sink \(value)")
k += 1
}
for i in 0...1000 {
subject.send(i)
}
print ("> j \(j)")
print ("> k \(k)")
The for-await loop miss some events.
My explanation is that as PassthroughSubject doesn't buffer anything, if an event is published during the for-await block, it is lost. Am I right ?
Instead of this, you can use AsyncStream which have an Int.max buffer size:
var subject = PassthroughSubject<Int, Never> ()
var k = 0
var l = 0
let stream = AsyncStream { continuation in
let cancellable = subject.sink { continuation.yield($0) }
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
Task {
for await value in stream {
print ("await \(value)")
l += 1
}
}
subject.sink { value in
print ("sink \(value)")
k += 1
}
for i in 0...1000 {
subject.send(i)
}
print ("> k \(k)")
print ("> l \(l)")
Which give the wanted result:
> k 1001
> l 1001
My question how values is implemented and why this property is the way recommended by the documentation when it is "dangerous" for those who do not know that we can lose events with it?
You're only "losing" events due to a race condition in your code - you're starting to send events before you start listening for them, as your listener Task spawns & runs asynchronously and may not begin running until after you start your generator loop on the main thread.
Using AsyncStream to buffer the events is working around that by installing a listener (the AsyncStream) strictly before your start generating events. You don't necessarily need to do that in real-world code, depending on how much time you have before events start generating (typically "enough" in GUI applications) or if you have other means of ensuring your listener is ready before events are generated.
The behaviour is the same as for using sink on your subject - if you don't install that sink before generating events, you won't see those events there either. It's just that in your example code you're synchronously installing the sink before generating events, so it's always ready.
Unfortunately there's no great way (that I'm aware of) to know that your listener is ready (i.e. has actually started asynchronously iterating subject.values).
You are right about the possible race condition due to a possible delay before for await loop was installed. But I'm afraid it's not the problem here because as you can see in the result there is await 0 printed.
Some also tell me that it might be a race condition in j += 1.
To avoid all this problems, I made another version and I still notice that with for await value in subject.values some events are missing.
actor CounterActor {
var value: Int
init(_ initialValue: Int) {
value = initialValue
}
func increase() {
value += 1
}
}
var subject = PassthroughSubject<Int, Never> ()
var cancellable: AnyCancellable?
var j = CounterActor(0)
var k = CounterActor(0)
Task {
for await value in subject.values {
print ("await \(value)")
await j.increase()
}
}
cancellable = subject.sink { value in
print ("sink \(value)")
Task {
await k.increase()
}
}
Task {
//wait to be sure for await is ready
try? await Task.sleep(for: .seconds(1.0))
for i in 0...1000 {
subject.send(i)
}
//wait to be sure last send have been dispatched
try? await Task.sleep(for: .seconds(1.0))
print ("> j \(await j.value)")
print ("> k \(await k.value)")
}
Yeah, looks like Combine's provided AsyncSequence is buggy. When I run your code it prints await 0 and never anything more from that task. If I force a 1ms sleep in between sends into the publisher, then it works as expected.
You could file a bug report with Apple, but in any case it looks like you'll have to reimplement the bridge to Structured Concurrency by having your sink closure push values into an AsyncStream. e.g.:
import Combine
var subject = PassthroughSubject<Int, Never> ()
var cancellable: AnyCancellable?
var j = 0
var k = 0
let stream = AsyncStream { continuation in
cancellable = subject.sink { _ in
continuation.finish()
} receiveValue: {
print ("sink \($0)")
k += 1
continuation.yield($0)
}
}
let reader = Task {
for await value in stream {
print ("await \(value)")
j += 1
}
return j
}
let writer = Task {
var sent = 0
for i in 0...1000 {
subject.send(i)
sent += 1
}
subject.send(completion: .finished)
return sent
}
print("Sent \(await writer.value), received \(await reader.value) via async sequence (\(k) via sink).")
extension Publisher where Failure == Never {
public var stream: AsyncStream<Output> {
AsyncStream { continuation in
let cancellable = self.sink { completion in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}
extension Publisher where Failure: Error {
public var stream: AsyncThrowingStream<Output, Error> {
AsyncThrowingStream<Output, Error> { continuation in
let cancellable = self.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}
My last version using @preconcurrency on Combine import :
import Foundation
@preconcurrency import Combine
extension Publisher where Failure == Never {
public var stream: AsyncStream<Output> {
AsyncStream { continuation in
let cancellable = self.sink { completion in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}
extension Publisher where Failure: Error {
public var stream: AsyncThrowingStream<Output, Error> {
AsyncThrowingStream<Output, Error> { continuation in
let cancellable = self.sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
}
}