What would be the best way to allow a Combine publisher to be paused to temporarily stop emitting events, but buffer any incoming events, and then when resumed it will emit anything in the buffer and then continue to emit normally?
I came up with this which works, but doesn't feel like the best way to solve this problem:
extension Publisher {
func pausableSink(
receiveValue: @escaping (Output) -> Void,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
) -> PausableSinkSubscriber<Output, Failure> {
let subscriber = PausableSinkSubscriber<Output, Failure>(
receiveValue: receiveValue,
receiveCompletion: receiveCompletion
)
self.subscribe(subscriber)
return subscriber
}
func pausableSink(
_ receiveValue: @escaping (Output) -> Void
) -> PausableSinkSubscriber<Output, Failure> where Failure == Never {
let subscriber = PausableSinkSubscriber<Output, Failure>(
receiveValue: receiveValue
)
self.subscribe(subscriber)
return subscriber
}
}
class PausableSinkSubscriber<Input, Failure: Error>: Subscriber, Cancellable {
var subscription: Subscription?
var isPaused = false {
didSet {
if !isPaused {
sendBuffer()
}
}
}
var buffer = [Input]()
var receiveValue: (Input) -> Void
var receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)?
init(
receiveValue: @escaping (Input) -> Void,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}
init(
receiveValue: @escaping (Input) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = nil
}
deinit {
cancel()
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
if isPaused {
buffer.append(input)
} else {
receiveValue(input)
}
return self.demand
}
func sendBuffer() {
while !buffer.isEmpty {
let value = buffer.remove(at: 0)
receiveValue(value)
}
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion?(completion)
}
var demand: Subscribers.Demand {
isPaused ? .unlimited : .none
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
let timerPub = Timer.publish(every: 1, on: .main, in: .default).autoconnect()
let timerSub = timerPub
.pausableSink { print($0) }
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("pausing publisher")
timerSub.isPaused = true
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
print("resuming publisher")
timerSub.isPaused = false
}