Hi. I have no time to completely solve the task, but I can share similar operator that I implemented in RxSwift. You can implement your own using Combine.
extension ObservableType where Element: Hashable {
public func debounceEachUniqueElement(_ dueTime: RxTimeInterval, scheduler: some SchedulerType) -> Observable<Element> {
Observable<Element>.create { observer -> any Disposable in
let debouncer = ElementDebouncer<Element>(observer: observer)
let disposable = self.subscribe(onNext: { element in
debouncer.lockAndDo {
let token = UUID()
debouncer._addPending(element: element, token: token)
let schedulerDisp = scheduler.scheduleRelative((element, token), dueTime: dueTime, action: debouncer.propagate)
debouncer._addDisposable(schedulerDisp, token: token)
}
}, onError: { error in
debouncer.lockAndDo {
debouncer._onTerminationCleanResources()
observer.onError(error)
}
}, onCompleted: {
debouncer.lockAndDo {
debouncer._onCompletedEmitAllPendingValues()
debouncer._onTerminationCleanResources()
observer.onCompleted()
}
}, onDisposed: nil)
return disposable
} // .create end
}
}
fileprivate final class ElementDebouncer<T: Hashable> {
private let _lock = NSRecursiveLock()
private var pendingElementsQueue: [T: UUID] = [:]
private var disposables: [UUID: any Disposable] = [:]
private let observer: AnyObserver<T>
init(observer: AnyObserver<T>) {
self.observer = observer
}
func lockAndDo(_ action: VoidClosure) {
_lock.withLockVoid(action)
}
func _addPending(element: T, token: UUID) {
pendingElementsQueue[element] = token
}
func _addDisposable(_ disposable: any Disposable, token: UUID) {
disposables[token] = disposable
}
func _onTerminationCleanResources() {
disposables.forEach { _, disposable in disposable.dispose() }
disposables = [:]
pendingElementsQueue = [:]
}
func _onCompletedEmitAllPendingValues() {
pendingElementsQueue.forEach { element, _ in observer.onNext(element) }
}
func propagate(_ args: (element: T, scheduledToken: UUID)) -> any Disposable {
_lock.lock(); defer { _lock.unlock() }
let (element, scheduledToken) = args
disposables[scheduledToken]?.dispose()
disposables[scheduledToken] = nil
if let currentToken = pendingElementsQueue[element], currentToken == scheduledToken {
pendingElementsQueue[element] = nil
observer.onNext(element)
}
assertError(pendingElementsQueue.count < 100, ConditionalError(code: .excessOfLimit))
return Disposables.create()
}
}
In your case you need to add a closure for making a decision to debounce or not. Something like: subject.debounce(dueTime: Duration, shouldDebounce: @escaping (Element) -> Bool)
or subject.debounce(shouldDebounce: @escaping (Element) -> Duration?) – there is no debounce when nil and debounce with provided time interval if not nil. This also gives the ability to provide different debounce intervals for different elements.
Thanks, with your insights I ended up with this. I don't need to track each elements separately, instead simply overriding the debounce with the condition: