I am having an issue with Publishers.FlatMap
where a request is sent upstream immediately upon subscription, even though my Subscriber
has not made a request at that time.
I make a simple chain of operators, like so: (forgive the error mangling -- it's a reduction.)
let pub = URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.swift.org/")!)
.handleEvents(receiveOutput: { print($0.0.count) })
.mapError({ _ in NetworkError() })
.flatMap({ ($0.response as? HTTPURLResponse)?.statusCode == 200
? Result.Publisher($0.data)
: Result.Publisher(NetworkError()) })
let sub = TimedSubscriber<Data, NetworkError>()
pub.subscribe(sub)
My custom subscriber makes a request of 1 in receive(subscription:)
, but only after a 2-second delay. You'd expect the output of that handleEvents
to appear after 2 seconds, but it appears immediately. (Well, after the 200-ish milliseconds of the network request.)
My custom subscriber is like so: (sorry for the code dump.)
class TimedSubscriber<Input, Failure: Error>: Subscriber {
let combineIdentifier = CombineIdentifier()
private var subscription: Subscription?
private let q = DispatchQueue(label: "timed-requests", qos: .background)
init() {}
deinit { subscription?.cancel() }
func receive(subscription: Subscription) {
print("subscribing")
self.subscription = subscription
request(after: .seconds(2))
}
func receive(_ input: Input) -> Subscribers.Demand {
print("received")
request(after: .seconds(2))
return .none
}
private func request(after waitTime: DispatchTimeInterval) {
q.schedule(after: .init(.now() + waitTime)) {
print("requesting")
self.subscription?.request(.max(1))
}
}
func receive(completion: Subscribers.Completion<Failure>) {}
}
I have found that if I use tryMap
to perform effectively the same operation, I get the output of handleEvents
at moment I expect it.
If I stick print()
in there, I see that an unlimited request has been made at the moment of subscription. What's the logic of this? Or is it a bug? It seems like the wrong behaviour to me.
Thanks!