barksten
(Anders östlin)
1
I need a way to invoke a callback from a combine publisher and found it cumbersome to store the cancellable returned from .sink so I came up with this solution:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
It works for me but I have that feeling... What's the catch?
Why restrict the Failure type of your ResultSubscriber to Error? Consider making it generic over any error type.
I'm surprised it works at all.
- You need to store the
ResultSubscriber anyway, just like the cancellable returned from .sink.
- The
ResultSubscriber looks like an odd combination of materialize, first, and invoke.
- You doesn't cancel the
subscription after receiving a value, which leaks resources. But
- Most curious, the
subscription you received is released immediately, which result in a cancellation. You shouldn't have received any value at all.
barksten
(Anders östlin)
4
Example playground:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
func test() -> AnyPublisher<Int, Error> {
Just(42)
.delay(for: 2, scheduler: DispatchQueue.main)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
There are no Cancellables involved so it's not cancelled.When is it released? 
When subscription leave the scope of receive(subscription:).
Your example works because delay created a temporary retain cycle, which I consider as a bug. It won't work for other publishers:
func test() -> AnyPublisher<Int, Error> {
Timer.publish(every: 1, on: .main, in: .common)
.map { _ in 42 }
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
// You forgot to start the runloop
RunLoop.main.run()