I've been trying to replicate flatMapLatest from RxSwift in Combine, I've read in a few places that the solution is to use .map(...).switchToLatest
I'm finding some differences between the two, and I'm not sure if it's my implementation/understanding which is the problem.
In RxSwift if the upstream observable emits a stop event (completed or error) then the downstream observables created in the flatMapLatest closure will continue to emit events until they themselves emit a stop event:
let disposeBag = DisposeBag()
func flatMapLatestDemo() {
let mockTrigger = PublishSubject<Void>()
let mockDataTask = PublishSubject<Void>()
mockTrigger
.flatMapLatest { mockDataTask }
.subscribe(onNext: { print("RECEIVED VALUE") })
.disposed(by: disposeBag)
mockTrigger.onNext(())
mockTrigger.onCompleted()
mockDataTask.onNext(()) // -> "RECEIVED VALUE" is printed
}
This same setup in Combine doesn't behave the same way:
var cancellables = Set<AnyCancellable>()
func switchToLatestDemo() {
let mockTrigger = PassthroughSubject<Void, Never>()
let mockDataTask = PassthroughSubject<Void, Never>()
mockTrigger
.map { mockDataTask }
.switchToLatest()
.sink { print("RECEIVED VALUE") }
.store(in: &cancellables)
mockTrigger.send(())
mockTrigger.send(completion: .finished)
mockDataTask.send(()) // -> Nothing is printed, if I uncomment the finished event above then "RECEIVED VALUE" is printed
}
Is this intentional? If so, how do we replicate the behaviour of flatMapLatest in Combine?
I understand the difference between them, I'm looking for a way to replicate the behaviour of Rx flatMapLatest in Combine. There are a few articles/cheatsheets I've found which suggest that switchToLatest is the best candidate but as I've demonstrated in my code samples we're still missing some functionality.
I guess what I'm looking for is a way to transform an upstream publisher into another publisher and not have the stop event from the upstream cause the subscription to be cancelled unless all the downstream publishers created in the map closure emit stop events. Which is one of the behaviours of flatMap in Rx.
To be honest the RxSwift example looks like it has a bug. If mockTrigger is completed flatMapLatest should be shut down. For some reason it's not even though the upstream is dead by that time. I would actually expect exactly what Combine is doing in terms of switchToLatest: upstream died, switchToLatest is shut down, sink receives completion event, sending anything into mockDataTask should end up in the void.
I think you're correct. It would be worth it to file an issue with RxSwift to verify. It might also be useful to use a different Rx library to test for the expected behavior (though, that might lead to a false positive since it, too, could have a bug?)
I just checked the implementation of RxSwift and it looks like under the hood they check if the latest observable is known and if it is the completion won't cause the dispose of the Switch operator. This however is not mentioned in the description of the operator: ReactiveX - Switch operator
@mikevelu could you file a bug report on RxSwift's GitHub page please?
However the completion line of the upstream is not forwarded in the marble diagram either.
This is a confusing situation. The above makes less sense to me because:
boolObservable
.flatMapLatest { condition in
if condition {
return otherObservable
} else {
return .empty()
}
}
Something like this is common to be able to shut down the previous inner subscription and enter an idle state. If the inner observable completes, it won't cause the whole stream to complete. Therefore the whole stream can and should complete as soon as the upstream completes. (My guess is also that we should not wait until the flattened stream completes as it can be infinite and likely cause unexpected bugs in the program. I second the Combine behavior from the original post and personally view the RxSwift behavior as a bug here.)
This isn't an issue with the Rx implementation, it is behaving as expected. See this video for an in depth explanation of the operator (selectMany was the original name for flatMap):
Well then I must admit that I missed that critical point for all these years. If you flatten an infinite observable using flatMapLatest or even flatMap and the upstream completes you completely loose any control of the completeness of the flattened observable (unless you own it) or you dispose/cancel the entire chain.
Maybe @Tony_Parker can tell us about the behavior we see here with Combine.
Same could be said of any subscription to an infinite observable though, no? The fact you're creating the observable inside the flatMap or flatMapLatest closure isn't that significant a detail in my eyes.
As Mike mentioned, this isn't a bug in RxSwift. This is how disposal of flattened publishers is supposed to happen based on the definition of that operator.