Invoking a callback from a publisher

I need a way to invoke a callback from a combine publisher and found it cumbersome to store the cancellable returned from .sink so I came up with this solution:

import Combine
import Foundation

class ResultSubscriber<Input>: Subscriber {
    typealias Failure = Error

    let callback: (Result<Input, Error>) -> Void

    init(callback: @escaping (Result<Input, Error>) -> Void) {
        self.callback = callback
    }
    
    func receive(subscription: Subscription) {
        subscription.request(.max(1))
    }
    
    func receive(_ input: Input) -> Subscribers.Demand {
        callback(.success(input))
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Error>) {
        if case let .failure(error) = completion {
            callback(.failure(error))
        }
    }
}

It works for me but I have that feeling... What's the catch?

Why restrict the Failure type of your ResultSubscriber to Error? Consider making it generic over any error type.

I'm surprised it works at all.

  1. You need to store the ResultSubscriber anyway, just like the cancellable returned from .sink.
  2. The ResultSubscriber looks like an odd combination of materialize, first, and invoke.
  3. You doesn't cancel the subscription after receiving a value, which leaks resources. But
  4. Most curious, the subscription you received is released immediately, which result in a cancellation. You shouldn't have received any value at all.

Example playground:

import Combine
import Foundation

class ResultSubscriber<Input>: Subscriber {
    typealias Failure = Error
    
    let callback: (Result<Input, Error>) -> Void
    
    init(callback: @escaping (Result<Input, Error>) -> Void) {
        self.callback = callback
    }
    
    func receive(subscription: Subscription) {
        subscription.request(.max(1))
    }
    
    func receive(_ input: Input) -> Subscribers.Demand {
        callback(.success(input))
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Error>) {
        if case let .failure(error) = completion {
            callback(.failure(error))
        }
    }
}


func test() -> AnyPublisher<Int, Error> {
    Just(42)
        .delay(for: 2, scheduler: DispatchQueue.main)
        .setFailureType(to: Error.self)
        .eraseToAnyPublisher()
}

let s = ResultSubscriber<Int>() { result in
    dump(result)
}

print(Date())
test().subscribe(s)
print(Date())

There are no Cancellables involved so it's not cancelled.When is it released? 🤷

When subscription leave the scope of receive(subscription:).

Your example works because delay created a temporary retain cycle, which I consider as a bug. It won't work for other publishers:

func test() -> AnyPublisher<Int, Error> {
    Timer.publish(every: 1, on: .main, in: .common)
        .map { _ in 42 }
        .setFailureType(to: Error.self)
        .eraseToAnyPublisher()
}

let s = ResultSubscriber<Int>() { result in
    dump(result)
}

print(Date())
test().subscribe(s)
print(Date())

// You forgot to start the runloop
RunLoop.main.run()
Terms of Service

Privacy Policy

Cookie Policy