Hi @Tony_Parker, thank you for tackling this issue. I must admit that I'm not familiar with the concept of back pressure yet, it does not exist in context of RxSwift. Therefore my assumptions about the behavior of reactive streams might be slightly off then defined in Combine.
I eventually tried to build my own ConcatMap
publisher, but I failed at several points:
- Mine only operates on finite upstreams - this is totally wrong
- Instead of
switchToLatest
, which does not work, I had to use flatMap
again.
This example produces the expected output, but it's still not fully correct as just mentioned.
extension Publishers {
public struct ConcatMap<NewPublisher, Upstream>:
Publisher
where
NewPublisher: Publisher,
Upstream: Publisher,
NewPublisher.Failure == Upstream.Failure
{
public typealias Output = NewPublisher.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let transform: (Upstream.Output) -> NewPublisher
public init(
upstream: Upstream,
transform: @escaping (Upstream.Output) -> NewPublisher
) {
self.upstream = upstream
self.transform = transform
}
public func receive<S>(subscriber: S)
where
S: Subscriber,
NewPublisher.Output == S.Input,
Upstream.Failure == S.Failure
{
let transform = self.transform
let start = Empty<Output, Failure>(completeImmediately: true)
.eraseToAnyPublisher()
// This implementation is meh, because it requires `upstream` to be
// finite and complete first before we can start appending.
upstream
.map { output -> AnyPublisher<Output, Failure> in
transform(output).eraseToAnyPublisher()
}
.reduce(start) { result, publisher in
result.append(publisher).eraseToAnyPublisher()
}
// I don't know why flat map is needed here.
.flatMap { $0 }
// I though `switchToLatest` like `flatMapLatest` from RxSwift is
// what I need here to kick off everything, but it does not work.
// .switchToLatest()
.receive(subscriber: subscriber)
}
}
}
extension Publisher {
public func concatMap<T, P>(
_ transform: @escaping (Self.Output) -> P
) -> Publishers.ConcatMap<P, Self>
where
T == P.Output,
P: Publisher,
Self.Failure == P.Failure
{
return Publishers.ConcatMap(upstream: self, transform: transform)
}
}
let page = PlaygroundPage.current
page.needsIndefiniteExecution = true
let publisher = PassthroughSubject<Int, Never>()
let something = publisher
.concatMap { value in
Publishers.Sequence(sequence: [Int](repeating: value, count: value))
.flatMap { value in
Just(value).delay(
for: .seconds(Double.random(in: 1 ... 4)),
scheduler: RunLoop.main
)
}
}
.sink { value in
print(value)
}
Publishers.Sequence(sequence: [1, 2, 3, 4, 5, 6, 7, 8, 9]).subscribe(publisher)
DispatchQueue.main.asyncAfter(deadline: .now() + 120) {
something.cancel()
page.finishExecution()
}
I think in case of concatMap
we really shouldn't make any assumption about the upstream. I linked a few articles related to the operator, which also mention that concatMap
isn't efficient in terms of async work, because if the upstream will fire rapidly but concatMap
will process everything slower and preserve order and wait until each merged publisher completes, the processing time may grow very fast. The made up example above is just perfect for that scenario, but it's exactly what I expect from this operator.
In our app I only have two usages of concatMap
. First, that does firmware updates in sequence, handles errors if any happen, retries if needed, then process any pending firmware updates. Second, we queue bluetooth events and transform those into UI events that are synced with a specific time interval. So if events arrive faster, they will be enqueued, then processed when needed by the interval tick.
This leads me to the question, does .buffer(size: ..., prefetch: .byRequest ...
acts like a queue, or are all buffered elements always kept in the buffer?
I rewrote the ConcatMap
operator using buffer
and flatMap(maxPublishers:)
, but I don't know if Buffer
will eventually explode if it's not acting like a FIFO queue.
extension Publishers {
public struct ConcatMap<NewPublisher, Upstream>:
Publisher
where
NewPublisher: Publisher,
Upstream: Publisher,
NewPublisher.Failure == Upstream.Failure
{
public typealias Output = NewPublisher.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let transform: (Upstream.Output) -> NewPublisher
public init(
upstream: Upstream,
transform: @escaping (Upstream.Output) -> NewPublisher
) {
self.upstream = upstream
self.transform = transform
}
public func receive<S>(subscriber: S)
where
S: Subscriber,
NewPublisher.Output == S.Input,
Upstream.Failure == S.Failure
{
upstream
// - Is this potentially a FIFO queue?
// - I hope it uses a (dynamic) Array so it can grow and
// shrink in size as needed.
.buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(1), transform)
.receive(subscriber: subscriber)
}
}
}