I'm looking for some feedback on a subscriber I've been working on.
I've found in my own apps that as I've been transitioning my asynchronous code to publishers (like networking code using URLSession.DataTaskPublisher
) I've had to replace code similar to this:
client.request(urlRequest) { result in
switch result {
case .success(let response): // handle the response
case .failure(let error): // handle the error
}
}
with code similar to this:
client.request(urlRequest)
.sink(receiveCompletion: { completion in
switch completion {
case .finished: break
case .failure(let error): // handle the error
}, receiveValue: { response in
// handle the response
})
.store(in: &requests)
There's a bit more code, which is mostly fine, but my issue is that here a lot of that new code doesn't really do anything (case .finished: break
). I found that I started to miss working with a single result closure. And so I set out to write a "result subscriber" that would give me the best of both worlds:
extension Subscribers {
final class Result<Input, Failure>: Subscriber, Cancellable where Failure: Error {
private var numberOfInputsRecieved = 0
private let numberOfInputsUntilCompletion: Int?
let receiveResult: (Swift.Result<Input, Failure>) -> Void
private var latestInput: Input?
private var subscription: Subscription?
/// Creates a result subscriber with a demand and a result closure.
/// - Parameter numberOfInputsUntilCompletion: The number of inputs that will be received before calling the result closure.
/// **Must be a value greater than zero.**
///
/// Passing `nil` indicates that the result closure won't be called until the upstream completes. This
/// subscriber requests unlimited demand from its upstream.
/// Passing `n` would indicate that the result closure should be called after receiving `n` inputs and would be called
/// with only the latest. This subscriber will request a max of `n` demand from its upstream. If `n` inputs are not received
/// before the upstream completes, the result closure will still be called. At least 1 input must be received for the result
/// closure to be called (if no inputs are received before the upstream completes, the result closure will never be called.)
/// - Parameter receiveResult: Closure which will be called with a `Result` once the desired number of inputs has been received.
/// The closure is always called with the latest input.
init(completesAfter numberOfInputsUntilCompletion: Int?, receiveResult: @escaping (Swift.Result<Input, Failure>) -> Void) {
if let n = numberOfInputsUntilCompletion {
precondition(n > 0, "numberOfInputsUntilCompletion must be a value greater than zero.")
}
self.numberOfInputsUntilCompletion = numberOfInputsUntilCompletion
self.receiveResult = receiveResult
}
func receive(subscription: Subscription) {
self.subscription = subscription
if let demand = numberOfInputsUntilCompletion {
subscription.request(.max(demand))
} else {
subscription.request(.unlimited)
}
}
func receive(_ input: Input) -> Subscribers.Demand {
latestInput = input
numberOfInputsRecieved += 1
if let threshold = numberOfInputsUntilCompletion {
let delta = threshold - numberOfInputsRecieved
if delta <= 0 {
receiveResult(.success(input))
return .none
} else {
return .max(delta)
}
} else {
return .unlimited
}
}
func receive(completion: Subscribers.Completion<Failure>) {
switch completion {
case .failure(let error):
receiveResult(.failure(error))
case .finished:
guard let input = latestInput else {
// We have two options here:
// 1. We can treat this an error and abort:
// fatalError("Finished without receiving any input")
//
// or
//
// 2. We can return early and not call the result handler.
// In this case, the result handler will never be called:
return
}
receiveResult(.success(input))
}
}
func cancel() {
subscription?.cancel()
}
}
}
extension Publisher {
func result(completesAfter numberOfInputsUntilCompletion: Int? = nil, receiveResult: @escaping (Result<Self.Output, Self.Failure>) -> Void) -> AnyCancellable {
let subscriber = Subscribers.Result(completesAfter: numberOfInputsUntilCompletion, receiveResult: receiveResult)
subscribe(subscriber)
return AnyCancellable(subscriber)
}
}
In use it would look something like:
client.request(urlRequest)
.result { result in
switch result {
case .success(let response): // handle the response
case .failure(let error): // handle the error
}
}
.store(in: &requests)
I've got a couple questions/areas I'd like feedback on:
- Is there another way to roughly accomplish what I'm after without writing a new subscriber (am I missing something?)
- Is what I'm doing in
cancel()
sufficient? Should I be doing more than just cancelling the subscription? Is it okay to hold a reference to the subscription inside of the subscriber? - In regards to the case where the upstream never sends an input before completing, what would be the "right" thing to do in your opinion? Never call the result closure (
receiveResult
)? Fatal error? Something else? - Any other feedback is much appreciated.