Combine `.receive(on: RunLoop.main)` loses sent value. How can I make it work?

My message gets lost if I make subject to receive on a scheduler.
Here's my code to reproduce.

import Foundation
import Combine

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
control.send(233)
RunLoop.main.run()
// No print!

I expected to receive 233, but it won't.
This works if I remove .receive(on: RunLoop.main).

let control = PassthroughSubject<Int,Never>()
let pipe = control.sink(receiveValue: { print($0) })
control.send(233)
RunLoop.main.run()
// Prints `233`. Works!

233 gets lost whatever scheduler I use.

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: DispatchQueue.main).sink(receiveValue: { print($0) })
control.send(233)
dispatchMain()
// No print.

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: DispatchQueue.global()).sink(receiveValue: { print($0) })
control.send(233)
dispatchMain()
// No print.

I don't know why. I though I should be able to receive 233 in this way.
But this works if I send 233 asynchronously.

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
DispatchQueue.main.async { control.send(233) }
RunLoop.main.run()
// Prints `233`. Works!

I am calling receive(on:) to process messages coming from multiple different publishers in strict FIFO order. In this way I expected to use RunLoop as a serial message queue. Am I doing something wrong? How can I make it work?

I’m not in a position to test this right now, but I think your problem is with the handling of pipe. This is an AnyCancellable, so releasing the last reference to it will cancel the subscription. Your code doesn’t do anything to maintain a persistence reference to pipe, so Swift releases the last reference immediately after creating it, and things cancel immediately.

Try explicitly extending the lifetime of pipe:

withExtendedLifetime(pipe) {
    RunLoop.main.run()
}

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

I tried your suggestion, but it still doesn't work.

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
withExtendedLifetime(pipe) {
    control.send(233)
    RunLoop.main.run()
}
// No print.

All code examples are placed and tests in main.swift. I have been thought local variables declared in main.swift will survive until program exit. Doesn't it?

Also it does not explain why it works if I delete .receive(...) part.

Perhaps because you’re calling receive before running the RunLoop?

1 Like

Please add buffer operator after control and see if start to work. If it does then I also tend to say that your order is the issue because you‘re using a passthrough subject.

We can reproduce the problem using AnySubscriber instead of Sink, and that gives us more opportunities to see what's happening:

let c0 = PassthroughSubject<Int, Never>()
let c1 = c0.receive(on: RunLoop.main)

let sub = AnySubscriber<Int, Never>(receiveSubscription: {
    print("receiveSubscription \($0)")
    $0.request(.unlimited)
}, receiveValue: {
    print("receiveValue \($0)")
    return .unlimited
}, receiveCompletion: nil)

c1
    .subscribe(sub)

print("send(233)")
c0.send(233)

Here's the output:

send(233)
receiveSubscription ReceiveOn

So we can see that the send happens before sub receives the subscription. Therefore sub hasn't sent a demand to its upstream by the time of the send, so the value 233 has to be discarded before reaching sub.

The send happen before sub receives the subscription because the receive(on:) operator promises to make all calls to its downstreams on the given scheduler. This includes calling a downstream's receive(subscription:) method. The only way the receive(on:) operator can guarantee that it calls its downstream on the given scheduler is by using the scheduler's schedule(_:) method. RunLoop's schedule(_:) method apparently uses CFRunLoopPerformBlock, which always queues the block to be performed later.

You can work around this problem by inserting a buffer operator before receive(on:):

let c1 = c0
    .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
    .receive(on: RunLoop.main)

You should adjust the buffer parameters based on your needs.

I don't know if this is a good workaround. It might be nice to have a receiveValuesAndCompletion(on:) that still subscribes synchronously, and it might not be too hard to implement your own version of that.

4 Likes

Thanks for the detailed writeup @mayoff. This seems like an awful gotcha. Is this behavior intentional?

I don't think that's a reason. Because RunLoop keeps queued functions and execute them well when it starts. For example, this code works.

RunLoop.main.perform { print("AAA") }
RunLoop.main.run()
// Prints "AAA". Works.

Therefore, I think that's natural to expect early receive to work.
And actually this is just an example to reproduce my issue in smallest scale, original issue happens in my AppKit app code in same manner that is already running in a RunLoop.

Adding a .buffer made it works.

let control = PassthroughSubject<Int,Never>()
let pipe = control
    .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
    .receive(on: RunLoop.main).sink(receiveValue: { print($0) })
control.send(233)
RunLoop.main.run()
// Prtins `233`. It works.

Thanks for advise.
As I want to queue messages from different publishers in single buffer, .buffer doesn't seem to be a right choice for me.

And I still don't understand why my pipeline configuration have trouble.

  • RunLoop/GCDQ are supposed to queue incoming messages instead of dropping.
  • Pipeline is built to pass messages to a RunLoop/GCDQ and alive at the point of sending. Therefore I can't see a reason to lose the message.

IMO, my configuration looks to work. Maybe there are some underlying gotchas, but I think my configuration should work as it looks.

I am not sure that I am understanding Combine correctly, but it seems like receive(subscription:) is sending message of "subscription start" and receive(on:) is sending actual messages. As both of them are essentially sent messages, I think all of them should arrive in sent order. If it doesn't arrive in sent order, it sounds like a bug IMO.

No. You can think of receive(subscription:) as a “subscription start” message, but receive(on:) is not in any way a “sent message”.

The receive(on:) operator wraps a Publisher in an instance of Publishers.ReceiveOn, which is itself a Publisher. When you call receive(on:), it does not send any messages at that time. No messages are sent until after some Subscriber is subscribed to the ReceiveOn or to something downstream of the ReceiveOn.

The job of the ReceiveOn is to receive messages from upstream and pass them downstream on a specific Scheduler.

Another workaround that occurs to me now is to use the same Scheduler for sending the messages. That is:

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
RunLoop.main.schedule { control.send(233) }
RunLoop.main.run()

However, with this workaround, there's no reason to use receive(on:) anymore, as the messages are all going to be delivered on the main thread anyway.

Keep in mind also that your example code is a toy. It is not clear that it is applicable to a real application. In a real application, you might easily be able to set up all of your pipelines and allow a turn of the run loop (to allow the receive(subscription:) calls to finish) before the first message is sent through any subject.

@mayoff
First, I was confused with receive(on:) and receive(). I'm sorry. What I wanted to say was "start subscription"(receive(subscription:)) and "actual messages"(receive()).


Anyway, now my question is, if send happens before receiving "subscription", so actual messages arrive before "start subscription" message, is it fine? It doesn't seem to be okay to me because I expected my pipe expression finished building of processing pipeline, and it's ready to use immediately. I expected sent messages to be transferred to sink. Though I attached ReceiveOn, it wraps both of upstream/downstream pipes isn't it? I am sending messages to upstream pipe, and the upstream pipe looks like not doing its job.

To me, internal or current implementation doesn't match with how it looks. This is what I really don't understand. Don't you think so? How do you think about this mismatch? I'd like to hear your opinion on this design(?).

@Jon_Shier I suspect this is not an intension just like you pointed. If it is, I think that a bad design.


Actually, this is primary reason I am using receive(on:). I send message at any time at any place in any thread and let Combine to deal with delivery and ordering guarantee. And in real product, I do not expect any silent message loss for whatever reason. Pipeline must be ready immediately and message once pushed into pipeline must be delivered. This kind of silent message loss potentially allows subtle bugs. Such potential is a real pain in the ass.

If you are a person working in Combine team, please do not make such assumption. A few of subtle unexpected/surprising behavior exponentially increases things to consider in real world application. Please don't allow any potential.

In my app, order of messages are very important. That's why I want to enqueue all messages into single scheduler so I can process them in strict FIFO order. Also this workaround avoids a reason to use Combine. Should I deal with execution context switching manually just to avoid unexpected (or surprisingly designed) message loss...?

        let mx = ProjectManagement()
        // `mx.control` is a `PassthroughSubject`.
        mx.control.send(.restore(from: loc)) 
        projs.append(mx)

This is a part of the problem. This code is called in an AppKit menu event handler, therefore in a run-loop. The .restore message must be sent immediately right after creation of ProjectManagement. If another message sent to the ProjectManagement before .restore, it yields wrong result as there's no data loaded. It's okay to process .restore message later, but it must arrive before any other message.

        let mx = ProjectManagement()
        RunLoop.main.schedule { [weak mx] in
            mx?.control.send(.restore(from: loc)) 
        }

I can't do this because this potentially allows sending of other messages into mx.control before GCDQ executes pushed function. The problem is anyone can send message to mx.control and they potentially can use different queues. To ensure using of same queue, I have to manage queues, and that concludes to use single queue object, and that is essentially duplication of receive(on:) (if it worked as expected).

Thanks for the suggestions anyway.

1 Like

Just a note.
This is same in Xcode 11 GM. It seems this is by design. I'm so sad.

If you were familiar with Apple’s release process you’d know that being in a GM means nothing in regards to whether something’s a bug or intentional behavior. @Tony_Parker @Philippe_Hausler Can either of you tell us whether this is actually intended behavior?

3 Likes

The root cause of 'losing' the value is the PassthroughSubject. That Subject is designed specifically not to buffer any values. This keeps the performance characteristics of the type simple - it does not have the possibility of growing to a very large size in memory if it receives a lot of values.

Therefore, if PassthroughSubject receives a value before any downstream is attached (as can happen in an asynchronous scenario), it must drop it.

Using the buffer before the receive gives the PassthroughSubject a place to put the new value (because buffer is synchronously attached to that Subject).

Sometimes a buffer and Subject are combined into a hypothetical ReplaySubject, which buffers N values in order to "catch up" new subscribers. Combine does not have that type, at least not yet. We are aware of its utility and have quite a few requests to add it.

One thing we do have today is a CurrentValueSubject, which does store exactly one value:

let control = CurrentValueSubject<Int,Never>(0)
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
control.send(233)
RunLoop.main.run()

prints 233 for me. CurrentValueSubject's storage of 1 value is designed to handle most UI scenarios.

Hope this helps.

3 Likes

On a related note, I've created a version of CurrentValueSubject that doesn't require an initial value. I'm not sure whether my wrapper approach is the most efficient, but it was certainly easy:

final class OptionalValueSubject<Output, Failure>: Subject where Failure: Error {
    private var currentValueSubject = CurrentValueSubject<Output?, Failure>(nil)
    
    func send(_ value: Output) {
        currentValueSubject.send(value)
    }
    
    func send(completion: Subscribers.Completion<Failure>) {
        currentValueSubject.send(completion: completion)
    }
    
    func send(subscription: Subscription) {
        currentValueSubject.send(subscription: subscription)
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        currentValueSubject.compactMap { $0 }.receive(subscriber: subscriber)
    }
}

Okay I see your concern of scalability. We need to limit size buffer, and I agree on that.

And I got a few more questions. I hope you to help me on my questions.

Does this mean even run-loops and dispatch-queues won't buffer any message (or functions) with PassthroughSubject.receive(on:)? AFAIK, run-loops and dispatch-queues are designed to buffer any incoming messages implicitly and this makes me confused. If so, it would make sense to require attaching .buffer(...) manually. Though I don't know how it's possible but maybe there's some secret... Does this mean Combine is designed to avoid even such implicit buffering? Like stopping upstream until target execution queue finishes processing last message?

Here's how I feel.

// I attach downstream in caller (current) context.
// `msg1` must be printed as downstream is attached here
// before sending it.
subject1.receive(on: gcdq1).sink({ print($0) })
subject1.send(msg1)
// I attach downstream in target (gcdq1) context.
// `msg1` must be ignored 
// as the downstream is not attached before sending it.
gcdq1.async { subject1.receive(on: gcdq1).sink({ print($0) }) }
subject1.send(msg1)

IMO, Combine is showing second behavior for first expression.

What I don't understand is, having .receive(on:) expression "looks" like I "attached" downstream "synchronously", but it actually isn't. Actually the receive(on:) call itself happens in caller context synchronously, therefore I think my expectation makes sense. I'm not calling receive(on:) "in" target execution context.

AFAIK, the main reason is sending "subscription start" message gets sent to downstream later. And AFAIK, this is because Combine is using "subscription start" as "attachment" signal.

But here in this case, I see this doesn't work. IMO, "attachment" is not same with "subscription". Especially in asynchronous pipeline. Attachment is done in caller context when I call receive(on:) to build the pipeline, and receiving of such subscription start/value/end messages happens in other execution context asynchronously. At the point of attachment, "subscription start" has to be sent to target context, and consequent "sent value" has to be sent "after" the "subscription start" message. To me current situation looks like Combine is not sending "subscription start" message to target context because it believes "attachment" is not done at the point of calling receive(on:).

Though I don't know well about details of Combine, this doesn't seem to have any conflict with "no buffering" policy to me. No actual "sent message" need to be buffered to behave like this as "subscription start" is not a "sent messages" going through the pipeline.

This is just my opinion. I like to hear dev team opinion and direction, so I can direct future development.

This sounds like you are considering only UI use cases. I didn't say I am using this for UI. I am now using Combine as in-memory inter-component message passing channels. These components control subprocesses, call server API and aggregate data, but not really about UI. (Though final result goes to UI) As point of having this pipeline is "sending non-idempotent control/command/notification messages" rather than "representing state values", they should process everything precisely. They require absolute no message loss, duplication or shuffling on dynamic attach/detach and immediate message sending scenario. They should pass messages if attached, and should not if unattached. Only PassthroughSubject matches for this purpose, not CurrentValueSubject or ReplaySubject as I think they potentially can send unexpected messages.

Should I consider Combine wouldn't match well for this purpose? @Jon_Shier @Tony_Parker

Hi, @Hoon_H, I am also studying Combine recently, and I want to share some of my thoughts.

I’m working on CombineX currently in my spare time, which is an open source implementation of Combine. And you can see my implementation of ReceiveOn here.
Just as @mayoff said, the main reason for the problem is the following lines:

func receive(subscription: Subscription) {
    guard self.lock.withLockGet(self.state.relay(subscription)) else {
        subscription.cancel()
        return
    }
    self.schedule {  // the downstream will receive the subscription on scheduler
        self.sub.receive(subscription: self)
    }
}

So things become:

var subscribed = false

func send(_ output: Int) {
    if subscribed {
        print("send:", output)
    }
}

runLoop.perform {
    subscribed = true
}

send(1) // won't print "1" since subscribed is false now

An elegant way to solve your problem is a new operator: ReceiveValueOn, which only schedule input and completion event with the scheduler. We plan to add it to CXExtensions later, then you can take a look!


By the way! CombineX needs help very much now. If you are interested in Combine, you are welcome to join us, whether as a project manager, a developer or coder reviewer, we are all grateful! It is a great opportunity to validate and learn from Combine!

1 Like

I 100% agree. But it's not only "elegant". I think that's what many people expect (modifying the scheduling of values and completion without modifying the graph of subscriptions).

Combine is supposed to become a part of our everyday tool belt, because it feeds SwiftUI. I think Combine shouldn't require its user to master the theory and practice of reactive programming and concurrency. In this regard, my opinion is that buffer(size:prefetch:whenFull:) should remain an advanced low-level operator with rare use cases.

@Tony_Parker, what do you think? What if we see several third-party libraries coming that "plug" holes in Combine - in order to make it usable by our dear "average Joe"? I know the RxSwift maintainers spent a lot of time figuring out which operators should ship with the core lib, and which ones had to remain external extensions. Was this concern part of the design of Combine?

There's a lot here, and I'll try to answer as best I can.

receive(on:) will cause execution of all receive messages (receive(subscription:), receive(_ input: Input), receive(completion:)) on the specified Scheduler (a run loop in this case).

Ordering of messages in Combine follows a strict rule:

  • 1 subscription downstream
  • Request of N values upstream
  • N values downstream
  • 1 completion (or no completion, if infinite)

Nothing can be sent before the subscription is received downstream, because the downstream did not have an opportunity to make a request yet. Requesting values requires a subscription. The subscription arrives asynchronously (because that is what receive(on:) is defined to do, and also because everything in Combine is designed to be potentially asynchronous). In the example, the RunLoop has not yet run and therefore there is no possibility of a subscription being received by the sink operator.

Here is another way to "fix" it:

let control = PassthroughSubject<Int,Never>()
let pipe = control.receive(on: RunLoop.main).sink(receiveValue: { print($0) })
RunLoop.main.run(until: Date().addingTimeInterval(0.1)) // run the run loop for .1 seconds
control.send(233)
RunLoop.main.run() // run the run loop forever

The reason the first run loop run is required is because this code is already executing on the main thread and therefore blocks. RunLoop needs a chance to execute the enqueued work of delivering the subscription.


I think also it may help to remember that the chain of operators you're setting up are descriptions; not much is happening when you call receive(on:) besides recording your arguments in a struct. The actual "instantiation" of the chain happens when the last thing in the chain calls subscribe. sink is different than the others:

    public func sink(
        receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
        receiveValue: @escaping ((Output) -> Void))
        -> AnyCancellable 
    {
        let sink = Subscribers.Sink<Self.Output, Self.Failure>(receiveCompletion: receiveCompletion, receiveValue: receiveValue)
        self.subscribe(sink)
        return AnyCancellable(sink)
    }

So once you call it, it calls subscribe on receive(on:). In response to that, receive(on:) says "ok, here is your subscription" and sends is asynchronously like this (simplified):

scheduler.schedule(options: options) {
    ds.receive(subscription: self)
}

And when sink receives that subscription it asks it for some values:

public func receive(subscription: Subscription) {
    subscription.request(.unlimited)
}

Once the unlimited request comes in, receive sends it to the subject, at which point values can be sent safely.

Hopefully this helps clarify.

2 Likes
Terms of Service

Privacy Policy

Cookie Policy