Combine's .collect(.byTime) schedules a repeating timer?

I just discovered that calling .collect(.byTime) on a combine publisher starts a repeating timer until you unsubscribe.

I was expecting the timer to only be scheduled when there are incoming items. Is there a way to get collect(.byTime) like behavior without an always repeating timer?

1 Like

I've included my current not very well tested solution.

This is to solve the problem:

  1. You have background work being performed and generating a sequence of results.
  2. You have a UI that you want to update with those results (you want them all, not just last/first)
  3. You want to batch process the results, it's too expensive to schedule each result separately on the main queue.

I would be interested to know if there is a better solution with less custom code for this. Combine's .collect(.byTime) was working well for me, but suffers from a performance problem: it schedules a repeating timer, so your app is always using CPU even when no events are coming through the publisher.

The following code is a modification of CombineX's CollectByTime. It seems to work and doesn't use CPU when no values are flowing, but I don't really know what I'm doing when it comes to parts of it, hopefully I copied/pasted correctly.

Suggestions welcome! :)

import Combine

extension Publisher {
    
    public func collectOnScheduler<S: Scheduler>(_ scheduler: S) -> Publishers.CollectOnScheduler<Self, S> {
        return .init(upstream: self, scheduler: scheduler)
    }
    
}

extension Publishers {
    
    public struct CollectOnScheduler<Upstream: Publisher, Context: Scheduler>: Publisher {
        
        public typealias Output = [Upstream.Output]
        public typealias Failure = Upstream.Failure
        
        public let upstream: Upstream
        public let context: Context
        
        public init(upstream: Upstream, scheduler: Context) {
            self.upstream = upstream
            self.context = scheduler
        }
        
        public func receive<S: Subscriber>(subscriber: S) where Upstream.Failure == S.Failure, S.Input == [Upstream.Output] {
            upstream.subscribe(ByTime(pub: self, sub: subscriber))
        }
        
    }
    
}

extension Publishers.CollectOnScheduler {
    
    private final class ByTime<S>: Subscription,
                                   Subscriber,
                                   CustomStringConvertible,
                                   CustomDebugStringConvertible
    where
    S: Subscriber,
    S.Input == Output,
    S.Failure == Failure {
        
        typealias Input = Upstream.Output
        typealias Failure = Upstream.Failure
        
        typealias Pub = Publishers.CollectOnScheduler<Upstream, Context>
        typealias Sub = S
        
        let lock = RecursiveLock()
        let sub: Sub
        let context: Context

        var state = RelayState.waiting
        var demand: Subscribers.Demand = .none
        var isSendScheduled = false
        var buffer: [Input] = []
        
        init(pub: Pub, sub: Sub) {
            self.sub = sub
            self.context = pub.context
        }
        
        deinit {
            lock.cleanupLock()
        }
        
        func request(_ demand: Subscribers.Demand) {
            self.lock.lock()
            guard let subscription = self.state.subscription else {
                self.lock.unlock()
                return
            }
            self.demand += demand
            self.lock.unlock()
            
            subscription.request(.max(1))
        }
        
        func cancel() {
            self.lock.withLockGet(self.state.complete())?.cancel()
        }
        
        func receive(subscription: Subscription) {
            guard self.lock.withLockGet(self.state.relay(subscription)) else {
                subscription.cancel()
                return
            }
            self.sub.receive(subscription: self)
        }
        
        func receive(_ input: Input) -> Subscribers.Demand {
            self.lock.lock()
            guard self.state.isRelaying else {
                self.lock.unlock()
                return .none
            }
            
            self.buffer.append(input)
            self.scheduleSendBufferIfNeeded()
            
            self.lock.unlock()
            
            return .max(1)
        }
        
        private func scheduleSendBufferIfNeeded() {
            self.lock.lock()
            if !isSendScheduled && !self.buffer.isEmpty {
                isSendScheduled = true
                self.context.schedule { [weak self] in
                    self?.sendBuffer()
                }
            }
            self.lock.unlock()
        }
        
        private func sendBuffer() {
            self.lock.lock()
            self.isSendScheduled = false
            
            guard self.state.isRelaying else {
                self.lock.unlock()
                return
            }
            
            defer {
                self.scheduleSendBufferIfNeeded()
            }
            
            guard self.demand > 0 else {
                self.lock.unlock()
                return
            }
            
            let buffer = self.buffer
            self.buffer.removeAll(keepingCapacity: true)
            
            if buffer.isEmpty {
                self.lock.unlock()
                return
            }
            
            self.demand -= 1
            self.lock.unlock()
            
            let more = self.sub.receive(buffer)
            
            if more > 0 {
                self.lock.withLock {
                    self.demand += more
                }
            }
        }
        
        func receive(completion: Subscribers.Completion<Failure>) {
            self.lock.lock()
            guard let subscription = self.state.complete() else {
                self.lock.unlock()
                return
            }
            
            self.lock.unlock()
            
            subscription.cancel()
            
            var output: [Input]?
            if case .finished = completion, !self.buffer.isEmpty {
                output = self.buffer
            }
            self.buffer = []
            
            self.context.schedule {
                if let output = output {
                    _ = self.sub.receive(output)
                }
                self.sub.receive(completion: completion)
            }
        }
        
        var description: String {
            return "CollectOnScheduler"
        }
        
        var debugDescription: String {
            return "CollectOnScheduler"
        }
    }
}
1 Like