Hi @DevAndArtist,
I've got that bug in my queue now and I think the behavior you're seeing is about the PassthroughSubject
and not flatMap
. PassthroughSubject
will drop values if the downstream has not made any demand for them. Because flatMap
with maxPublishers: .max(1)
will only request one value at a time from upstream, and due to the delay, all values other than the first 1
are dropped.
This works (please forgive my slight reformatting as I adjusted this to fit into my test harness):
let something = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher
.flatMap(maxPublishers: .max(1)) { value in
[Int].init(repeating: value, count: value).publisher
.flatMap { value in
Just(value)
.delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
}
}
.sink { value in
print(value)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
something.cancel()
}
RunLoop.main.run(until: Date() + 400)
or this:
let subject = PassthroughSubject<Int, Never>()
let something = subject
.buffer(size: 9, prefetch: .byRequest, whenFull: .dropOldest)
.flatMap(maxPublishers: .max(1)) { value in
[Int].init(repeating: value, count: value).publisher
.flatMap { value in
Just(value)
.delay(for: .seconds(Double.random(in: 1 ... 4)), scheduler: RunLoop.main)
}
}
.sink { value in
print(value)
}
let c = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher.subscribe(subject)
DispatchQueue.main.asyncAfter(deadline: .now() + 300) {
something.cancel()
}
RunLoop.main.run(until: Date() + 400)
c.cancel()
I haven't yet checked the rest of the comparison to concatMap
you put in the bug (thank you for all of that).