I'm looking for some feedback on a subscriber I've been working on.
I've found in my own apps that as I've been transitioning my asynchronous code to publishers (like networking code using URLSession.DataTaskPublisher) I've had to replace code similar to this:
client.request(urlRequest) { result in
switch result {
case .success(let response): // handle the response
case .failure(let error): // handle the error
}
}
with code similar to this:
client.request(urlRequest)
.sink(receiveCompletion: { completion in
switch completion {
case .finished: break
case .failure(let error): // handle the error
}, receiveValue: { response in
// handle the response
})
.store(in: &requests)
There's a bit more code, which is mostly fine, but my issue is that here a lot of that new code doesn't really do anything (case .finished: break). I found that I started to miss working with a single result closure. And so I set out to write a "result subscriber" that would give me the best of both worlds:
extension Subscribers {
final class Result<Input, Failure>: Subscriber, Cancellable where Failure: Error {
private var numberOfInputsRecieved = 0
private let numberOfInputsUntilCompletion: Int?
let receiveResult: (Swift.Result<Input, Failure>) -> Void
private var latestInput: Input?
private var subscription: Subscription?
/// Creates a result subscriber with a demand and a result closure.
/// - Parameter numberOfInputsUntilCompletion: The number of inputs that will be received before calling the result closure.
/// **Must be a value greater than zero.**
///
/// Passing `nil` indicates that the result closure won't be called until the upstream completes. This
/// subscriber requests unlimited demand from its upstream.
/// Passing `n` would indicate that the result closure should be called after receiving `n` inputs and would be called
/// with only the latest. This subscriber will request a max of `n` demand from its upstream. If `n` inputs are not received
/// before the upstream completes, the result closure will still be called. At least 1 input must be received for the result
/// closure to be called (if no inputs are received before the upstream completes, the result closure will never be called.)
/// - Parameter receiveResult: Closure which will be called with a `Result` once the desired number of inputs has been received.
/// The closure is always called with the latest input.
init(completesAfter numberOfInputsUntilCompletion: Int?, receiveResult: @escaping (Swift.Result<Input, Failure>) -> Void) {
if let n = numberOfInputsUntilCompletion {
precondition(n > 0, "numberOfInputsUntilCompletion must be a value greater than zero.")
}
self.numberOfInputsUntilCompletion = numberOfInputsUntilCompletion
self.receiveResult = receiveResult
}
func receive(subscription: Subscription) {
self.subscription = subscription
if let demand = numberOfInputsUntilCompletion {
subscription.request(.max(demand))
} else {
subscription.request(.unlimited)
}
}
func receive(_ input: Input) -> Subscribers.Demand {
latestInput = input
numberOfInputsRecieved += 1
if let threshold = numberOfInputsUntilCompletion {
let delta = threshold - numberOfInputsRecieved
if delta <= 0 {
receiveResult(.success(input))
return .none
} else {
return .max(delta)
}
} else {
return .unlimited
}
}
func receive(completion: Subscribers.Completion<Failure>) {
switch completion {
case .failure(let error):
receiveResult(.failure(error))
case .finished:
guard let input = latestInput else {
// We have two options here:
// 1. We can treat this an error and abort:
// fatalError("Finished without receiving any input")
//
// or
//
// 2. We can return early and not call the result handler.
// In this case, the result handler will never be called:
return
}
receiveResult(.success(input))
}
}
func cancel() {
subscription?.cancel()
}
}
}
extension Publisher {
func result(completesAfter numberOfInputsUntilCompletion: Int? = nil, receiveResult: @escaping (Result<Self.Output, Self.Failure>) -> Void) -> AnyCancellable {
let subscriber = Subscribers.Result(completesAfter: numberOfInputsUntilCompletion, receiveResult: receiveResult)
subscribe(subscriber)
return AnyCancellable(subscriber)
}
}
In use it would look something like:
client.request(urlRequest)
.result { result in
switch result {
case .success(let response): // handle the response
case .failure(let error): // handle the error
}
}
.store(in: &requests)
I've got a couple questions/areas I'd like feedback on:
- Is there another way to roughly accomplish what I'm after without writing a new subscriber (am I missing something?)
- Is what I'm doing in
cancel() sufficient? Should I be doing more than just cancelling the subscription? Is it okay to hold a reference to the subscription inside of the subscriber?
- In regards to the case where the upstream never sends an input before completing, what would be the "right" thing to do in your opinion? Never call the result closure (
receiveResult)? Fatal error? Something else?
- Any other feedback is much appreciated.