Result Subscriber

I'm looking for some feedback on a subscriber I've been working on.

I've found in my own apps that as I've been transitioning my asynchronous code to publishers (like networking code using URLSession.DataTaskPublisher) I've had to replace code similar to this:

client.request(urlRequest) { result in
   switch result {
   case .success(let response): // handle the response
   case .failure(let error): // handle the error
   }
}

with code similar to this:

client.request(urlRequest)
   .sink(receiveCompletion: { completion in
       switch completion {
       case .finished: break
       case .failure(let error): // handle the error
   }, receiveValue: { response in
       // handle the response
   })
   .store(in: &requests)

There's a bit more code, which is mostly fine, but my issue is that here a lot of that new code doesn't really do anything (case .finished: break). I found that I started to miss working with a single result closure. And so I set out to write a "result subscriber" that would give me the best of both worlds:

extension Subscribers {
    final class Result<Input, Failure>: Subscriber, Cancellable where Failure: Error {

        private var numberOfInputsRecieved = 0
        private let numberOfInputsUntilCompletion: Int?
        let receiveResult: (Swift.Result<Input, Failure>) -> Void

        private var latestInput: Input?
        private var subscription: Subscription?

        /// Creates a result subscriber with a demand and a result closure.
        /// - Parameter numberOfInputsUntilCompletion: The number of inputs that will be received before calling the result closure.
        ///     **Must be a value greater than zero.**
        ///
        ///     Passing `nil` indicates that the result closure won't be called until the upstream completes. This
        ///     subscriber requests unlimited demand from its upstream.
        ///     Passing `n` would indicate that the result closure should be called after receiving `n` inputs and would be called
        ///     with only the latest. This subscriber will request a max of `n` demand from its upstream. If `n` inputs are not received
        ///     before the upstream completes, the result closure will still be called. At least 1 input must be received for the result
        ///     closure to be called (if no inputs are received before the upstream completes, the result closure will never be called.)
        /// - Parameter receiveResult: Closure which will be called with a `Result` once the desired number of inputs has been received.
        ///     The closure is always called with the latest input.
        init(completesAfter numberOfInputsUntilCompletion: Int?, receiveResult: @escaping (Swift.Result<Input, Failure>) -> Void) {
            if let n = numberOfInputsUntilCompletion {
                precondition(n > 0, "numberOfInputsUntilCompletion must be a value greater than zero.")
            }
            self.numberOfInputsUntilCompletion = numberOfInputsUntilCompletion
            self.receiveResult = receiveResult
        }

        func receive(subscription: Subscription) {
            self.subscription = subscription
            if let demand = numberOfInputsUntilCompletion {
                subscription.request(.max(demand))
            } else {
                subscription.request(.unlimited)
            }
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            latestInput = input
            numberOfInputsRecieved += 1
            if let threshold = numberOfInputsUntilCompletion {
                let delta = threshold - numberOfInputsRecieved
                if delta <= 0 {
                    receiveResult(.success(input))
                    return .none
                } else {
                    return .max(delta)
                }
            } else {
                return .unlimited
            }
        }

        func receive(completion: Subscribers.Completion<Failure>) {
            switch completion {
            case .failure(let error):
                receiveResult(.failure(error))

            case .finished:
                guard let input = latestInput else {
                    // We have two options here:
                    // 1. We can treat this an error and abort:
                    // fatalError("Finished without receiving any input")
                    //
                    // or
                    //
                    // 2. We can return early and not call the result handler.
                    //    In this case, the result handler will never be called:
                    return
                }

                receiveResult(.success(input))
            }
        }

        func cancel() {
            subscription?.cancel()
        }
    }
}

extension Publisher {
    func result(completesAfter numberOfInputsUntilCompletion: Int? = nil, receiveResult: @escaping (Result<Self.Output, Self.Failure>) -> Void) -> AnyCancellable {
        let subscriber = Subscribers.Result(completesAfter: numberOfInputsUntilCompletion, receiveResult: receiveResult)
        subscribe(subscriber)
        return AnyCancellable(subscriber)
    }
}

In use it would look something like:

client.request(urlRequest)
   .result { result in
       switch result {
       case .success(let response): // handle the response
       case .failure(let error): // handle the error
       }
   }
   .store(in: &requests)

I've got a couple questions/areas I'd like feedback on:

  1. Is there another way to roughly accomplish what I'm after without writing a new subscriber (am I missing something?)
  2. Is what I'm doing in cancel() sufficient? Should I be doing more than just cancelling the subscription? Is it okay to hold a reference to the subscription inside of the subscriber?
  3. In regards to the case where the upstream never sends an input before completing, what would be the "right" thing to do in your opinion? Never call the result closure (receiveResult)? Fatal error? Something else?
  4. Any other feedback is much appreciated.
2 Likes

You can use built-in operators to map output / failure to Result type.

publisher
    .map(Result.success)
    .catch { error in Just(.failure(error)) }
    .sink { result in print(result) }
    .store(in: &subscriptions)
2 Likes

If you really need a custom subscriber, I would recommend wrapping built-in subscribers and forward incoming events.

final class SinkResult<Input, Failure> where Failure: Error {
    
    private let sink: Subscribers.Sink<Input, Failure>
    
    public init(
        receiveValue: @escaping (Result<Input, Failure>) -> Void
    ) {
        
        self.sink = .init(
            receiveCompletion: { completion in
              
                switch completion {
                    
                case .finished: break
                
                case let .failure(error): receiveValue(.failure(error))
                    
                }
                    
            },
            receiveValue: { input in receiveValue(.success(input)) }
        )
        
    }
    
}

extension SinkResult: Subscriber {
    
    var combineIdentifier: CombineIdentifier { sink.combineIdentifier }
    
    func receive(_ input: Input) -> Subscribers.Demand {
        
        sink.receive(input)
        
    }
    
    func receive(subscription: Subscription) {
        
        sink.receive(subscription: subscription)
        
    }
    
    func receive(completion: Subscribers.Completion<Failure>) {
        
        sink.receive(completion: completion)
        
    }
    
}
1 Like

I knew I was overdoing it — thanks for the example.

That seems like a good approach as well, I'm sure there are aspects I haven't considered that Sink takes into account.

Thanks for the feedback, @royhsu!

There is a slight variation in behavior with both approaches in that every value/failure becomes a Result instead of only the final value, but for what I'm after that's actually fine.

I think I'll just create an operator extension:

extension Publisher {
    func result(_ receiveResult: @escaping (Result<Output, Failure>) -> Void) -> AnyCancellable {
        map(Result.success)
        .catch { Just(.failure($0)) }
        .sink(receiveValue: receiveResult)
    }
}

Because catch(_:) loses the connection to the original publisher once it fires, the downstream subscriber won't receive further values.

If you want to continue receiving values after failure, you will need to wrap them with flatMap(_:). For more detail, I would recommend to watch this awesome video from Apple.

By wrapping all the transforms into an operator may cause the call-side forgets that side-effect easily, it's better to add some documentation on your custom operator.

2 Likes

Right, I should’ve just said values.

I ended up doing basically the same thing, but keeping it as a Publisher so that I can further compose downstream, or alternate the Subscriber with assign(to:on:)

Would you mind sharing your solution here? I'm having trouble getting anything to work for all publishers.

I'm also on the market for the opposite of this thread, turning a <Result<Success, Failure>, Never> publisher in to a <Success, Failure> publisher. So far I just have:

somethingProducingResult
.setFailureType(to: Failure.self)
.flatMap { result in Future { promise in promise(result) } }

Trying to extract that into reusable code is also proving troublesome.

Are you looking for something like this?

extension Publisher {
    func dematerialize<S, E>() -> AnyPublisher<S, E> where Self.Output == Result<S, E>, Self.Failure == Never {
        self.setFailureType(to: E.self)
            .flatMap { Result.Publisher($0) }
            .eraseToAnyPublisher()
    }
}

struct MyError: Error {}

let resultPublisher = PassthroughSubject<Result<Int, MyError>, Never>()

let subscription = resultPublisher
    .dematerialize()
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished: print("finished")
        case .failure(let error): print("failure: \(error)")
        }
    }, receiveValue: { value in
        print("value: \(value)")
    })

resultPublisher.send(.success(1))
resultPublisher.send(.success(2))
resultPublisher.send(.success(3))
resultPublisher.send(.failure(MyError()))
resultPublisher.send(.success(42))

// value: 1
// value: 2
// value: 3
// failure: MyError()

I've called it dematerialize even though it isn't really that.

1 Like

Yes, thanks! I can believe I didn’t see Result.Publisher until now. :/

Happy to help! Yeah, that it's tucked under the Result namespace makes it a little hard to discover.