Combine `.receive(on: RunLoop.main)` loses sent value. How can I make it work?

Thanks for answering. I got the point.

  • Combine is pull-based (or request/demand-based).
  • Therefore, messages cannot be pushed into downstream pipeline with no request.
  • Request happens in the downstream context.

Therefore there's no way to "request" message until downstream context starts. Am I understanding correctly?

Now I understand why this behavior cannot be fixed and why .buffer(...) makes pipeline to work as I expected. And it seems you guys don't like to change implementation for API design improvement. Then maybe further discussion about API design would be meaningless.

This is sad because just IMO, if it was push-based with checking available capacity of downstream, so upstream retained control, maybe this doesn't have to work in this way. Anyway, that could come with another downsides.

Though I think this could have been better, but I don't have proven designs or solutions, and I cannot make any more request to you guys. Thanks for your works.

Though this looks quite safe, but I cannot take this solution.

  • I treat RunLoop/GCDQ as globally mutable collection of arbitrary opaque code, I don't like to run such "potentially anything is possible" blocks before sending message that has to be first.
  • I don't want any stopping of UI thread even for 0.1 second.
  • I am not sure how long I should run run-loop to "guarantee" processing of all queued messages in run-loop.
  • In my experience, relying on this kind of solution increases potential of subtle bugs a lot on later stage of development. I am going to be away as much as possible from this kind of solutions.

Thanks for the suggestion anyway.

For now, I just use .buffer(...).receive(on:) to make my pipeline to work as I desired. I think this would be enough for me. Taking potential message loss is painful, but it seems it's time to compromise.

I like to refer to Combine as "dynamic push/pull" rather than just "pull" or "push" (this terminology is from the Reactive Streams specification).

Values can be "pushed" but only after they have first been requested. This difference is critical to ensure sane memory performance.

I didn't mean to suggest that you should literally run the run loop for .1 seconds any time you use receive(on:). I simply ran the run loop there to prove the point - the work to deliver the subscription to sink is enqueued on the run loop (like your other "arbitrary opaque code"), so it must be run somehow.

1 Like

All of that said, this is clearly an area that causes confusion. We're going to look into it some more and see what might help.

1 Like

And do you think, @Tony_Parker, that a solution based on @luoxiu's idea, is not only correct, but recommendable?

This gist, which can be run in a playground, introduces a new Publisher.receiveValue(on:options:) Publisher method.

And it successfully prints "233", as expected by @Hoon_H.

The difference with the stock Combine Publisher.receive(on:options:) method is that only values and completion are re-scheduled. Subscriptions are not.

1 Like

Oh yes, please :-)

For example, I don't know how to interpret the documentation of Publisher.receive(on:options:). It says:

A publisher that delivers elements using the specified scheduler.

Nothing about the re-scheduling of subscriptions yet.

You use the receive(on:options:) operator to receive results on a specific scheduler, such as performing UI work on the main run loop.

More focus on values. At that point, the re-scheduling of subscriptions becomes quite unexpected.

In contrast with subscribe(on:options:) , which affects upstream messages, receive(on:options:) changes the execution context of downstream messages.

After this sentence, you could bet that subscriptions are totally not affected. But we learned in this thread that this is wrong. I guess the word "messages" includes the subscription... but this is really really misleading.

1 Like

The word 'messages' includes all calls that come from that publisher to its downstream, which include Subscriber.receive(subscription:), Subscriber.receive(_:) and Subscriber.receive(completion:).

All right. May I ask if we can now jump over the lessons, and attempt at applying what what we have learned so far?

What do you think of the various solutions we have seen so far to the question asked by the the OP ?

  • buffer operator
  • RunLoop.run
  • Hand-made receiveValue(on:)

Because,well... it's important to have theory and practice meet at some point ;-)

1 Like

We need some more time to consider each potential approach and how that plays out across the entire design (including every other operator that uses Scheduler). Sorry I don't have an absolute answer for you, but I want to make sure we are confident about what we suggest here.

1 Like

Just a note.
.buffer has unexpected (IMO) side effect on attach/detach subscriber from publisher. That it explained here.

Therefore, I can't rely on .buffer anymore and I think .buffer(...) should be removed from the options.

Thank you Tony and Lily :-). We users are also trying to wrap our head around Combine, and reprogram our existing expectations about reactive libraries. Thank you very much for reading those forums.

5 Likes

I've noticed a similar behavior (likely for the same reasons and solved by the same suggestion using buffer) when using delay(for:scheduler:). In this example the behavior only manifests with Fail (since it's sending a completion event and not a value) but not when using Just.

This will never print:

Fail(outputType: Void.self, failure: MyError.fail)
    .delay(for: 1, scheduler: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished: print("Finished")
        case .failure(let error): print("Error \(error)")
        }
    }, receiveValue: { value in
        print("Value: \(value)")
    })
    .store(in: &subscriptions)

// Never prints

Whereas this will:

Just(1)
    .delay(for: 1, scheduler: DispatchQueue.main)
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished: print("Finished")
        case .failure(let error): print("Error \(error)")
        }
    }, receiveValue: { value in
        print("Value: \(value)")
    })
    .store(in: &subscriptions)

// Prints (after 1 second):
// Value: 1
// Finished

I'm sharing this here because this is another example of what looks like an intuitive Combine pipeline but one whose results are anything but. (That is, unless you take a deeper dive to understand the contracts and theory.)

To a newcomer, knowing why the one will print but the other never will could be a source of incredible frustration. Arguably, both pipelines should do what their operators describe.

(Filed feedback for this specific example: FB7298518)

2 Likes

I agree. Gotchas like these are one of the reasons why I really don't like using reactive frameworks, even if I design my apps reactively.

2 Likes

It's important that we provide feedback to Apple people that come here and read our messages. Not only with anger and frustration, even if we feel borh feelings very deeply with Combine. I have the toughest difficulties testing my pipelines. I write dispatch-queues sensitive code and Combine sometimes betrays me. It's very hard to come up with minimal sample code that displays the surprising/inconsistent behaviors of Combine. So hard I could not yet open a positive radar. But only bug reports will make Apple engineer inflect their "behaved as expected" mantra.

2 Likes

I've been wondering a similar surprising behaviour with delay(for:scheduler:). Seems like it doesn't delay the completion in the case of Empty either. I'm unsure if that's by design.

Failure and completion events should not be delayed but propagated down the pipeline as soon as possible. If you want to delay such an event you have to catch it, materialize (not sure combine has such a hook), then delay the new pipeline and finally de-materialize.

1 Like

Right, I understand that. But most newcomers won't at first — I was just pointing out that what looks like an intuitive pipeline isn't (unless you understand the difference between completion events and regular values and the way that they're treated — which, yes, users of Combine (and other reactive frameworks) should take the time to understand.)

It would be useful to introduce a delayCompletion(for:scheduler:) operator that would delay completion events. It would sit alongside delay(for:scheduler:) in code completion prompting/reminding the user that completion events are treated differently and won't be delayed by the delay operator when they reach for it.

2 Likes

Matt Gallagher has been doing a rather deep dive into Combine on his blog (part 1, part 2, part 3) which may shed some more detailed light on the behaviors seen in this thread. Namely, part 3 deals with the exact same receive issues discussed here, along with many other async scenarios using Combine. He specifically recommends four things:

  1. Subscription and other “black boxes” should be fully documented (we shouldn’t be guessing about thread safety and graph lifecycles)
  2. support buffered subjects and other ways of sharing cached computations
  3. support scenarios where demand must never be zero
  4. receive(on:) should synchronously establish initial demand (only subscribe(on:) should asynchronously complete construction)

As an aside, to me, open sourcing Combine could take care of #1, or at least make it possible for the community to contribute documentation.

6 Likes

If failure and completion are sent asynchronously from values, then it would be easy to "lose" the last values sent before a completion. Setup (receive(subscription:)) is a bit of a different case since here since we're talking about if it's first & synchronous or first & asynchronous.

1 Like

Hello all,

As of developer beta 1 of iOS 13.3 (and associated releases for other platforms), we've changed the behavior of receive(on:) plus other Scheduler operators to synchronously send their subscription downstream. Previously, they would "async" it to the provided scheduler.

This means that the test case in the original post of this thread now does what most would expect (print the value). Since the subscription is received synchronously by sink, and it synchronously requests .unlimited from its upstream, there is no opportunity for any values sent to the PassthroughSubject to be dropped.

If you have access to the developer betas, please give it a try with your own Combine scenarios and let me know how it goes.

I'd like to extend a special thank you to everyone here and elsewhere across the web who provided us valuable feedback on this.

25 Likes

Hi @Tony_Parker I am facing a similar issue, stated below is the example:

Xcode 11.2.1 (11B500)

Problem:

When .receive(on: RunLoop.main) is used sink doesn't receive any values, however when it is commented out, then sink receives values.

Note: I have tried the following but still doesn't work:

  • Retained cancellable
  • Tested on iOS Device / Simulator / Playground.

Code:

import Foundation
import Combine

extension Notification.Name {
    public static let didPost = Notification.Name("didPost")
}

func postNotifications() {
    
    let names = ["aaa", "bbb", "ccc"]
    
    for name in names {
        
        NotificationCenter.default.post(name: .didPost,
                                        object: nil,
                                        userInfo: ["Car" : name])
    }
}

let cancellable = NotificationCenter.default.publisher(for: .didPost)
    .compactMap { $0.userInfo?["Car"] as? String }
    .receive(on: RunLoop.main) //works when commented out
    .sink {
        print("sink: \($0)")
}

postNotifications()
Terms of Service

Privacy Policy

Cookie Policy