[Combine] How to `debounce` specific events only?

Say we have following events:

enum Events {
    case initial
    case ongoing
    case enabled
    case disabled
    case error(Error)
}

and all these events is published on an ObservableObject:

class Observer: ObservableObject {
    @Published var event: Events = .initial
    var cancellables: Set<AnyCancellable> = []
}

What I want to achieve is:

  • When receiving an .ongoing event, wait for 1 second:
    • if a new event comes within that 1 second, skip .ongoing and publish the new event;
    • otherwise, publish the .ongoing event
  • When receiving an .error event, wait for 3 second:
    • if a new event comes within that 3 second, skip .error and publish the new event;
    • otherwise, publish the .error event

I understand the .debounce operator can achieve the effects, but that is applied to event flow on all events:

    init() {
        self.$event
+            .debounce(for: .seconds(1), scheduler: DispatchQueue.main)
            .sink {
                print("Event:", $0)
            }
            .store(in: &self.cancellables)
    }

I am wondering if there is a way to make sure the debouncing logic only kicks in selectively on certain events?

I tried with following implementation, but it seems no events gets published at all:

    init() {
        self.$event
+            .flatMap {
+                switch $0 {
+                case .ongoing:
+                    return Just($0)
+                        .debounce(for: .seconds(1), scheduler: DispatchQueue.main)
+
+                case .error:
+                    return Just($0)
+                        .debounce(for: .seconds(3), scheduler: DispatchQueue.main)
+
+                default:
+                    return Just($0)
+                        .debounce(for: .seconds(0), scheduler: DispatchQueue.main)
+                }
+            }
            .sink {
                print("Event:", $0)
            }
            .store(in: &self.cancellables)
    }

Figured it out.

Can you post your solution here? I'm facing the same issue.

Hi. I have no time to completely solve the task, but I can share similar operator that I implemented in RxSwift. You can implement your own using Combine.

extension ObservableType where Element: Hashable {
  public func debounceEachUniqueElement(_ dueTime: RxTimeInterval, scheduler: some SchedulerType) -> Observable<Element> {
    Observable<Element>.create { observer -> any Disposable in
      let debouncer = ElementDebouncer<Element>(observer: observer)
      
      let disposable = self.subscribe(onNext: { element in
        debouncer.lockAndDo {
          let token = UUID()
          debouncer._addPending(element: element, token: token)
          let schedulerDisp = scheduler.scheduleRelative((element, token), dueTime: dueTime, action: debouncer.propagate)
          debouncer._addDisposable(schedulerDisp, token: token)
        }
      }, onError: { error in
        debouncer.lockAndDo {
          debouncer._onTerminationCleanResources()
          
          observer.onError(error)
        }
      }, onCompleted: {
        debouncer.lockAndDo {
          debouncer._onCompletedEmitAllPendingValues()
          debouncer._onTerminationCleanResources()
          
          observer.onCompleted()
        }
      }, onDisposed: nil)
      
      return disposable
    } // .create end
  }
}

fileprivate final class ElementDebouncer<T: Hashable> {
  private let _lock = NSRecursiveLock()
  
  private var pendingElementsQueue: [T: UUID] = [:]
  private var disposables: [UUID: any Disposable] = [:]
  
  private let observer: AnyObserver<T>
  
  init(observer: AnyObserver<T>) {
    self.observer = observer
  }
  
  func lockAndDo(_ action: VoidClosure) {
    _lock.withLockVoid(action)
  }
  
  func _addPending(element: T, token: UUID) {
    pendingElementsQueue[element] = token
  }
  
  func _addDisposable(_ disposable: any Disposable, token: UUID) {
    disposables[token] = disposable
  }
  
  func _onTerminationCleanResources() {
    disposables.forEach { _, disposable in disposable.dispose() }
    disposables = [:]
    
    pendingElementsQueue = [:]
  }
  
  func _onCompletedEmitAllPendingValues() {
    pendingElementsQueue.forEach { element, _ in observer.onNext(element) }
  }
  
  func propagate(_ args: (element: T, scheduledToken: UUID)) -> any Disposable {
    _lock.lock(); defer { _lock.unlock() }
    
    let (element, scheduledToken) = args
    
    disposables[scheduledToken]?.dispose()
    disposables[scheduledToken] = nil
    
    if let currentToken = pendingElementsQueue[element], currentToken == scheduledToken {
      pendingElementsQueue[element] = nil
      observer.onNext(element)
    }
    
    assertError(pendingElementsQueue.count < 100, ConditionalError(code: .excessOfLimit))
    
    return Disposables.create()
  }
}

This operator debounces each element separately. As if each unique element had its own timer that is restarted when the element is emitted again while timer is not completed.
You can also see standard debounce operator as a reference: https://github.com/ReactiveX/RxSwift/blob/main/RxSwift/Observables/Debounce.swift

In your case you need to add a closure for making a decision to debounce or not. Something like:
subject.debounce(dueTime: Duration, shouldDebounce: @escaping (Element) -> Bool)
or
subject.debounce(shouldDebounce: @escaping (Element) -> Duration?) – there is no debounce when nil and debounce with provided time interval if not nil. This also gives the ability to provide different debounce intervals for different elements.

Thanks, with your insights I ended up with this. I don't need to track each elements separately, instead simply overriding the debounce with the condition:

public extension Publisher {
  func debounce<S>(
    for dueTime: S.SchedulerTimeType.Stride,
    scheduler: S,
    options: S.SchedulerOptions? = nil,
    shouldDebounce: @escaping (Output) -> Bool
  ) -> AnyPublisher<Output, Failure> where S: Scheduler {
    map { output in
      guard shouldDebounce(output) else {
        return Just(output)
          .eraseToAnyPublisher()
      }
      return Just(output)
        .delay(for: dueTime, scheduler: scheduler)
        .eraseToAnyPublisher()
    }
    .switchToLatest()
    .eraseToAnyPublisher()
  }
}