How to pause and resume a Combine Publisher?

What would be the best way to allow a Combine publisher to be paused to temporarily stop emitting events, but buffer any incoming events, and then when resumed it will emit anything in the buffer and then continue to emit normally?

I came up with this which works, but doesn't feel like the best way to solve this problem:

extension Publisher {
  func pausableSink(
    receiveValue: @escaping (Output) -> Void,
    receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
  ) -> PausableSinkSubscriber<Output, Failure> {
    let subscriber = PausableSinkSubscriber<Output, Failure>(
      receiveValue: receiveValue,
      receiveCompletion: receiveCompletion
    )
    self.subscribe(subscriber)
    return subscriber
  }

  func pausableSink(
    _ receiveValue: @escaping (Output) -> Void
  ) -> PausableSinkSubscriber<Output, Failure> where Failure == Never {
    let subscriber = PausableSinkSubscriber<Output, Failure>(
      receiveValue: receiveValue
    )
    self.subscribe(subscriber)
    return subscriber
  }
}

class PausableSinkSubscriber<Input, Failure: Error>: Subscriber, Cancellable {
  var subscription: Subscription?

  var isPaused = false {
    didSet {
      if !isPaused {
        sendBuffer()
      }
    }
  }
  var buffer = [Input]()
  var receiveValue: (Input) -> Void
  var receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)?

  init(
    receiveValue: @escaping (Input) -> Void,
    receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
  ) {
    self.receiveValue = receiveValue
    self.receiveCompletion = receiveCompletion
  }

  init(
    receiveValue: @escaping (Input) -> Void
  ) {
    self.receiveValue = receiveValue
    self.receiveCompletion = nil
  }

  deinit {
    cancel()
  }

  func receive(subscription: Subscription) {
    self.subscription = subscription
    subscription.request(.unlimited)
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    if isPaused {
      buffer.append(input)
    } else {
      receiveValue(input)
    }
    return self.demand
  }

  func sendBuffer() {
    while !buffer.isEmpty {
      let value = buffer.remove(at: 0)
      receiveValue(value)
    }
  }

  func receive(completion: Subscribers.Completion<Failure>) {
    receiveCompletion?(completion)
  }

  var demand: Subscribers.Demand {
    isPaused ? .unlimited : .none
  }

  func cancel() {
    subscription?.cancel()
    subscription = nil
  }
}

let timerPub = Timer.publish(every: 1, on: .main, in: .default).autoconnect()
let timerSub = timerPub
      .pausableSink { print($0) }
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
  print("pausing publisher")
  timerSub.isPaused = true
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
  print("resuming publisher")
  timerSub.isPaused = false
}
1 Like