barksten
(Anders östlin)
March 17, 2021, 3:27pm
1
I need a way to invoke a callback from a combine publisher and found it cumbersome to store the cancellable returned from .sink
so I came up with this solution:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
It works for me but I have that feeling... What's the catch?
Why restrict the Failure
type of your ResultSubscriber
to Error
? Consider making it generic over any error type.
I'm surprised it works at all.
You need to store the ResultSubscriber
anyway, just like the cancellable returned from .sink
.
The ResultSubscriber
looks like an odd combination of materialize , first , and invoke .
You doesn't cancel the subscription
after receiving a value, which leaks resources. But
Most curious, the subscription
you received is released immediately, which result in a cancellation. You shouldn't have received any value at all.
barksten
(Anders östlin)
April 13, 2021, 5:59am
4
Example playground:
import Combine
import Foundation
class ResultSubscriber<Input>: Subscriber {
typealias Failure = Error
let callback: (Result<Input, Error>) -> Void
init(callback: @escaping (Result<Input, Error>) -> Void) {
self.callback = callback
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Input) -> Subscribers.Demand {
callback(.success(input))
return .none
}
func receive(completion: Subscribers.Completion<Error>) {
if case let .failure(error) = completion {
callback(.failure(error))
}
}
}
func test() -> AnyPublisher<Int, Error> {
Just(42)
.delay(for: 2, scheduler: DispatchQueue.main)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
There are no Cancellables involved so it's not cancelled.When is it released?
When subscription
leave the scope of receive(subscription:)
.
Your example works because delay
created a temporary retain cycle, which I consider as a bug. It won't work for other publishers:
func test() -> AnyPublisher<Int, Error> {
Timer.publish(every: 1, on: .main, in: .common)
.map { _ in 42 }
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let s = ResultSubscriber<Int>() { result in
dump(result)
}
print(Date())
test().subscribe(s)
print(Date())
// You forgot to start the runloop
RunLoop.main.run()