I noticed that 'sink' never was called when using 'receive' . I tested on Xcode 11 beta 5.
public enum SomeError: Error {
case someError
}
Just("1")
.tryMap { val -> String in
if Bool.random() { throw SomeError.someError }
return val
}
.flatMap { val -> Empty<String, Error> in
return .init()
}
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
print(1, completion)
switch completion {
case .failure(let error):
print(3, error)
case .finished:
break
}
}) { val in
print(2, val)
}
The sink modifier returns an AnyCancellable. When the AnyCancellable is destroyed, it cancels the subscription. In your example, you're not storing the AnyCancellable, so it is destroyed immediately, cancelling the subscription immediately.
I've changed my code like this let requestCancellable = Just("1") but 'sink' isn't invoked.
When I remove .receive(on: RunLoop.main) then 'sink' start to be invoked.
If I remove 'flatMap' or 'receive' then 'sink' is started to invoke. I can't understand how it relative to AnyCancelable. I've written a simple code to test it.
let requestCancellable = Just("1")
.flatMap { val -> Empty<String, Never> in
return .init()
}
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
print(1, completion)
}) { val in
print(2, val)
}
It's entirely possible that by shortening the pipeline, you're winning the race between the deinitialization of your AnyCancellable and sink being called. Have you ensured its lifetime is properly managed?
I just ran into this issue today with something similar. Here is my example with some operators like map removed for simplicity. The chain looks like this:
urlSession.dataTaskPublisher(for: cdnIndexUrl)
.receive(on: DispatchQueue.main)
.receive(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.sink {
// Never called
}
.store(on: cancellables)
I could verify the dataTaskPublisher publishes a value but then the later publishers complete before sink has had a chance to be called. (Xcode 16.2)
Why does it happen?
Here is my take on why this is happening, and why it is not be an issue with Combine:
When using a global queue (e.g. concurrent, non-serial) there is a chance the completion could happen before the value is sent, especially if there is something that takes time to complete before the value can be sent. This is the nature of concurrent queues...
What can be done?
The best thing to do here is to create a serial queue, which targets one of the background queues:
Another way would be to use flatMap to return a new publisher. When the initial publisher completes it won't complete the overall stream. But that's just a bandaid