'sink' never is called when using 'receive'

I noticed that 'sink' never was called when using 'receive' . I tested on Xcode 11 beta 5.

public enum SomeError: Error {
    case someError
}

Just("1")
.tryMap { val -> String in
    if Bool.random() { throw SomeError.someError }
    return val
}
.flatMap { val -> Empty<String, Error> in
    return .init()
}
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
    print(1, completion)
    switch completion {
    case .failure(let error):
        print(3, error)
    case .finished:
        break
    }
}) { val in
    print(2, val)
}

The sink modifier returns an AnyCancellable. When the AnyCancellable is destroyed, it cancels the subscription. In your example, you're not storing the AnyCancellable, so it is destroyed immediately, cancelling the subscription immediately.

I've changed my code like this let requestCancellable = Just("1") but 'sink' isn't invoked.
When I remove .receive(on: RunLoop.main) then 'sink' start to be invoked.

1 Like

If I remove 'flatMap' or 'receive' then 'sink' is started to invoke. I can't understand how it relative to AnyCancelable. I've written a simple code to test it.

let requestCancellable = Just("1")
.flatMap { val -> Empty<String, Never> in
    return .init()
}
.receive(on: RunLoop.main)
.sink(receiveCompletion: { completion in
    print(1, completion)
}) { val in
    print(2, val)
}

It's entirely possible that by shortening the pipeline, you're winning the race between the deinitialization of your AnyCancellable and sink being called. Have you ensured its lifetime is properly managed?

I save requestCancellable in my store object and my view uses this store.

sink is never invoked.

struct MainView : View {
    var requestCancellable: AnyCancellable
    
    init() {
        requestCancellable = Just("1")
        .flatMap { val -> Empty<String, Never> in
            return .init()
        }
        .receive(on: RunLoop.main)
        .sink(receiveCompletion: { completion in
            print(1, completion)
        }) { val in
            print(2, val)
        }    }
    
    var body: some View {
        VStack {
            EmptyView()
        }
    }
}

I reproduced the problem in a playground using Xcode 11b5 on macOS 10.14.6. I think it's a Combine framework bug.

How do you manage the lifetime other than storing a reference to the AnyCancellable?

I also was able to reproduce this with xCode Version 12.1 (12A7403)

     publisher
        // any of this 
        .receive(on: DispatchQueue.main)
        .receive(on: RunLoop.main)

        .sink(receiveValue: updateState(_:))
        .store(in: &subscribers)

and updateState never called

That's correct, when I remove .receive method it actually calls the sink() part.

I just ran into this issue today with something similar. Here is my example with some operators like map removed for simplicity. The chain looks like this:

urlSession.dataTaskPublisher(for: cdnIndexUrl)
    .receive(on: DispatchQueue.main)
    .receive(on: DispatchQueue.global())
    .receive(on: DispatchQueue.main)
    .sink {
        // Never called
    }
    .store(on: cancellables)    

I could verify the dataTaskPublisher publishes a value but then the later publishers complete before sink has had a chance to be called. (Xcode 16.2)

Why does it happen?

Here is my take on why this is happening, and why it is not be an issue with Combine:

When using a global queue (e.g. concurrent, non-serial) there is a chance the completion could happen before the value is sent, especially if there is something that takes time to complete before the value can be sent. This is the nature of concurrent queues...

What can be done?

The best thing to do here is to create a serial queue, which targets one of the background queues:

urlSession.dataTaskPublisher(for: cdnIndexUrl)
    .receive(on: DispatchQueue.main)
    .receive(on: DispatchQueue(label: "serial", target: .global()))
    .receive(on: DispatchQueue.main)
    .sink {
        // FIXED !
    }
    .store(on: cancellables)    

Another way would be to use flatMap to return a new publisher. When the initial publisher completes it won't complete the overall stream. But that's just a bandaid :slight_smile: