I just discovered that calling .collect(.byTime)
on a combine publisher starts a repeating timer until you unsubscribe.
I was expecting the timer to only be scheduled when there are incoming items. Is there a way to get collect(.byTime)
like behavior without an always repeating timer?
1 Like
I've included my current not very well tested solution.
This is to solve the problem:
- You have background work being performed and generating a sequence of results.
- You have a UI that you want to update with those results (you want them all, not just last/first)
- You want to batch process the results, it's too expensive to schedule each result separately on the main queue.
I would be interested to know if there is a better solution with less custom code for this. Combine's .collect(.byTime)
was working well for me, but suffers from a performance problem: it schedules a repeating timer, so your app is always using CPU even when no events are coming through the publisher.
The following code is a modification of CombineX's CollectByTime. It seems to work and doesn't use CPU when no values are flowing, but I don't really know what I'm doing when it comes to parts of it, hopefully I copied/pasted correctly.
Suggestions welcome! :)
import Combine
extension Publisher {
public func collectOnScheduler<S: Scheduler>(_ scheduler: S) -> Publishers.CollectOnScheduler<Self, S> {
return .init(upstream: self, scheduler: scheduler)
}
}
extension Publishers {
public struct CollectOnScheduler<Upstream: Publisher, Context: Scheduler>: Publisher {
public typealias Output = [Upstream.Output]
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let context: Context
public init(upstream: Upstream, scheduler: Context) {
self.upstream = upstream
self.context = scheduler
}
public func receive<S: Subscriber>(subscriber: S) where Upstream.Failure == S.Failure, S.Input == [Upstream.Output] {
upstream.subscribe(ByTime(pub: self, sub: subscriber))
}
}
}
extension Publishers.CollectOnScheduler {
private final class ByTime<S>: Subscription,
Subscriber,
CustomStringConvertible,
CustomDebugStringConvertible
where
S: Subscriber,
S.Input == Output,
S.Failure == Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
typealias Pub = Publishers.CollectOnScheduler<Upstream, Context>
typealias Sub = S
let lock = RecursiveLock()
let sub: Sub
let context: Context
var state = RelayState.waiting
var demand: Subscribers.Demand = .none
var isSendScheduled = false
var buffer: [Input] = []
init(pub: Pub, sub: Sub) {
self.sub = sub
self.context = pub.context
}
deinit {
lock.cleanupLock()
}
func request(_ demand: Subscribers.Demand) {
self.lock.lock()
guard let subscription = self.state.subscription else {
self.lock.unlock()
return
}
self.demand += demand
self.lock.unlock()
subscription.request(.max(1))
}
func cancel() {
self.lock.withLockGet(self.state.complete())?.cancel()
}
func receive(subscription: Subscription) {
guard self.lock.withLockGet(self.state.relay(subscription)) else {
subscription.cancel()
return
}
self.sub.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
self.lock.lock()
guard self.state.isRelaying else {
self.lock.unlock()
return .none
}
self.buffer.append(input)
self.scheduleSendBufferIfNeeded()
self.lock.unlock()
return .max(1)
}
private func scheduleSendBufferIfNeeded() {
self.lock.lock()
if !isSendScheduled && !self.buffer.isEmpty {
isSendScheduled = true
self.context.schedule { [weak self] in
self?.sendBuffer()
}
}
self.lock.unlock()
}
private func sendBuffer() {
self.lock.lock()
self.isSendScheduled = false
guard self.state.isRelaying else {
self.lock.unlock()
return
}
defer {
self.scheduleSendBufferIfNeeded()
}
guard self.demand > 0 else {
self.lock.unlock()
return
}
let buffer = self.buffer
self.buffer.removeAll(keepingCapacity: true)
if buffer.isEmpty {
self.lock.unlock()
return
}
self.demand -= 1
self.lock.unlock()
let more = self.sub.receive(buffer)
if more > 0 {
self.lock.withLock {
self.demand += more
}
}
}
func receive(completion: Subscribers.Completion<Failure>) {
self.lock.lock()
guard let subscription = self.state.complete() else {
self.lock.unlock()
return
}
self.lock.unlock()
subscription.cancel()
var output: [Input]?
if case .finished = completion, !self.buffer.isEmpty {
output = self.buffer
}
self.buffer = []
self.context.schedule {
if let output = output {
_ = self.sub.receive(output)
}
self.sub.receive(completion: completion)
}
}
var description: String {
return "CollectOnScheduler"
}
var debugDescription: String {
return "CollectOnScheduler"
}
}
}
1 Like