Confused about behaviour of switchToLatest in Combine

I've been trying to replicate flatMapLatest from RxSwift in Combine, I've read in a few places that the solution is to use .map(...).switchToLatest

I'm finding some differences between the two, and I'm not sure if it's my implementation/understanding which is the problem.

In RxSwift if the upstream observable emits a stop event (completed or error) then the downstream observables created in the flatMapLatest closure will continue to emit events until they themselves emit a stop event:

let disposeBag = DisposeBag()

func flatMapLatestDemo() {
    let mockTrigger = PublishSubject<Void>()
    let mockDataTask = PublishSubject<Void>()

    mockTrigger
        .flatMapLatest { mockDataTask }
        .subscribe(onNext: { print("RECEIVED VALUE") })
        .disposed(by: disposeBag)

    mockTrigger.onNext(())
    mockTrigger.onCompleted()

    mockDataTask.onNext(()) // -> "RECEIVED VALUE" is printed
}

This same setup in Combine doesn't behave the same way:

var cancellables = Set<AnyCancellable>()

func switchToLatestDemo() {
    let mockTrigger = PassthroughSubject<Void, Never>()
    let mockDataTask = PassthroughSubject<Void, Never>()

    mockTrigger
        .map { mockDataTask }
        .switchToLatest()
        .sink { print("RECEIVED VALUE") }
        .store(in: &cancellables)

    mockTrigger.send(())
    mockTrigger.send(completion: .finished)

    mockDataTask.send(()) // -> Nothing is printed, if I uncomment the finished event above then "RECEIVED VALUE" is printed
}

Is this intentional? If so, how do we replicate the behaviour of flatMapLatest in Combine?

If it's not intentional, file a radar I guess?

1 Like

Here's a good article that clarifies the difference:

1 Like

I understand the difference between them, I'm looking for a way to replicate the behaviour of Rx flatMapLatest in Combine. There are a few articles/cheatsheets I've found which suggest that switchToLatest is the best candidate but as I've demonstrated in my code samples we're still missing some functionality.

I guess what I'm looking for is a way to transform an upstream publisher into another publisher and not have the stop event from the upstream cause the subscription to be cancelled unless all the downstream publishers created in the map closure emit stop events. Which is one of the behaviours of flatMap in Rx.

I'm not familiar with RxSwift.
But you can try the following code to see if this is what you want.

func switchToLatestDemo() {
    
    let mockTrigger = PassthroughSubject<String, Never>()
    
    let mockDataTask = PassthroughSubject<String, Never>()
    
    let flatMapLatestSubject = PassthroughSubject<AnyPublisher<String, Never>, Never>()
    
    flatMapLatestSubject
        .eraseToAnyPublisher()
        .switchToLatest()
        .sink { print($0) }
        .store(in: &cancellables)

    // Switch to mockTrigger.
    flatMapLatestSubject.send(mockTrigger.eraseToAnyPublisher())
    
    mockTrigger.send("mockTrigger: 1")
    
    mockTrigger.send(completion: .finished)

    mockTrigger.send("mockTrigger: !!!!!!")
    
    mockDataTask.send("mockDataTask: 1")
    
    // Switch to mockDataTask.
    flatMapLatestSubject.send(mockDataTask.eraseToAnyPublisher())
    
    mockTrigger.send("mockTrigger: 2")
    
    mockDataTask.send("mockDataTask: 2")
    
}

The result will be

mockTrigger: 1
mockDataTask: 2

To be honest the RxSwift example looks like it has a bug. If mockTrigger is completed flatMapLatest should be shut down. For some reason it's not even though the upstream is dead by that time. I would actually expect exactly what Combine is doing in terms of switchToLatest: upstream died, switchToLatest is shut down, sink receives completion event, sending anything into mockDataTask should end up in the void.

I think you're correct. It would be worth it to file an issue with RxSwift to verify. It might also be useful to use a different Rx library to test for the expected behavior (though, that might lead to a false positive since it, too, could have a bug?)

I just checked the implementation of RxSwift and it looks like under the hood they check if the latest observable is known and if it is the completion won't cause the dispose of the Switch operator. This however is not mentioned in the description of the operator: http://reactivex.io/documentation/operators/switch.html

@mikevelu could you file a bug report on RxSwift's GitHub page please?


This particular code path prevents the correct disposal: https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Switch.swift#L114

However the completion line of the upstream is not forwarded in the marble diagram either. :thinking: :confused:

This is a confusing situation. The above :point_up: makes less sense to me because:

boolObservable
  .flatMapLatest { condition in 
    if condition {
      return otherObservable
    } else {
      return .empty()
    }
  }

Something like this is common to be able to shut down the previous inner subscription and enter an idle state. If the inner observable completes, it won't cause the whole stream to complete. Therefore the whole stream can and should complete as soon as the upstream completes. (My guess is also that we should not wait until the flattened stream completes as it can be infinite and likely cause unexpected bugs in the program. I second the Combine behavior from the original post and personally view the RxSwift behavior as a bug here.)

@DevAndArtist @clayellis

This isn't an issue with the Rx implementation, it is behaving as expected. See this video for an in depth explanation of the operator (selectMany was the original name for flatMap):

4 Likes

Well then I must admit that I missed that critical point for all these years. :dizzy_face: If you flatten an infinite observable using flatMapLatest or even flatMap and the upstream completes you completely loose any control of the completeness of the flattened observable (unless you own it) or you dispose/cancel the entire chain.

Maybe @Tony_Parker can tell us about the behavior we see here with Combine.

Same could be said of any subscription to an infinite observable though, no? The fact you're creating the observable inside the flatMap or flatMapLatest closure isn't that significant a detail in my eyes.

(In passing — that video was excellent. Is there a directory for more of those?)

3 Likes

If my ears don't lie to me it sounds like the guy speaking in the back is Erik Meijer.

1 Like

More videos from the same series here:

https://channel9.msdn.com/Blogs/J.Van.Gogh

This is also one of the bests I’ve had recommended to me for fundamentals:

5 Likes

This might suit your need:

extension Publisher {

   func flatMapLatest<T: Publisher>(_ transform: @escaping (Self.Output) -> T) -> AnyPublisher<T.Output, T.Failure> where T.Failure == Self.Failure {
       return map(transform).switchToLatest().eraseToAnyPublisher()
   }
}

That's just wrapping what I'm doing already in the code I posted?

There is a possibility this is a bug. I'll look into it.

5 Likes

As Mike mentioned, this isn't a bug in RxSwift. This is how disposal of flattened publishers is supposed to happen based on the definition of that operator.

1 Like

Well I admitted my misunderstanding of the full behavior of this operator a little later in this thread. ;)

1 Like

Sorry, I only now read the entire thread :P

1 Like
Terms of Service

Privacy Policy

Cookie Policy