barksten
(Anders östlin)
March 17, 2021, 3:27pm
1
I need a way to invoke a callback from a combine publisher and found it cumbersome to store the cancellable returned from .sink so I came up with this solution:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
It works for me but I have that feeling... What's the catch?
Why restrict the Failure type of your ResultSubscriber to Error? Consider making it generic over any error type.
I'm surprised it works at all.
You need to store the ResultSubscriber anyway, just like the cancellable returned from .sink.
The ResultSubscriber looks like an odd combination of materialize , first , and invoke .
You doesn't cancel the subscription after receiving a value, which leaks resources. But
Most curious, the subscription you received is released immediately, which result in a cancellation. You shouldn't have received any value at all.
barksten
(Anders östlin)
April 13, 2021, 5:59am
4
Example playground:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
func test() -> AnyPublisher<Int, Error> {
Just(42)
.delay(for: 2, scheduler: DispatchQueue.main)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
There are no Cancellables involved so it's not cancelled.When is it released?
When subscription leave the scope of receive(subscription:).
Your example works because delay created a temporary retain cycle, which I consider as a bug. It won't work for other publishers:
func test() -> AnyPublisher<Int, Error> {
Timer.publish(every: 1, on: .main, in: .common)
.map { _ in 42 }
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
// You forgot to start the runloop
RunLoop.main.run()