Consider a hypothetical publisher Always<Output>
:
public struct Always<Output>: Publisher {
public typealias Failure = Never
public init(_ output: Output)
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}
It's a lot like Just
, except that instead of producing a single output and then completing, it continues to produce the same value as long as there is demand.
I took a stab at implementing it. Here's a snippet from the internal Subscription
class:
extension Always.Subscription: Subscriber {
func request(_ demand: Subscribers.Demand)
// assume an instance variable `subscriber`
var demand = demand
while demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
}
}
If I attach to this publisher using sink
, this code results in an endless loop, as expected. However, even this code results in an infinite loop:
// Expected: `first` should request a single value then complete
// Actual: `first` forwards the unlimited demand, resulting in an endless loop!
Always(1).first().sink { value in
print(value)
}
Curiously, if I simulate this publisher using Publishers.Sequence
, everything works just fine:
let ones = sequence(first: 1) { $0 }
ones.publisher.first().sink { value in
print(value)
}
How does Publishers.Sequence
achieve this? What's the mistake in my implementation of Always.Subscription
, and more generally, how should a synchronous publisher be implemented such that it respects demand?