public struct Always<Output>: Publisher {
public typealias Failure = Never
public init(_ output: Output)
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}
It's a lot like Just, except that instead of producing a single output and then completing, it continues to produce the same value as long as there is demand.
I took a stab at implementing it. Here's a snippet from the internal Subscription class:
If I attach to this publisher using sink, this code results in an endless loop, as expected. However, even this code results in an infinite loop:
// Expected: `first` should request a single value then complete
// Actual: `first` forwards the unlimited demand, resulting in an endless loop!
Always(1).first().sink { value in
print(value)
}
Curiously, if I simulate this publisher using Publishers.Sequence, everything works just fine:
let ones = sequence(first: 1) { $0 }
ones.publisher.first().sink { value in
print(value)
}
How does Publishers.Sequence achieve this? What's the mistake in my implementation of Always.Subscription, and more generally, how should a synchronous publisher be implemented such that it respects demand?
Unlimited demand is requested in both situations. But, after that first value is received, a finished event is received downstream, which then propagates cancellations back up.
Pointing to Always.Subscription’s cancel method as a possible route to break out of the loop. Here’s a rough sketch that restores the expected behavior.
public struct Always<Output>: Publisher {
public typealias Failure = Never
private let output: Output
public init(_ output: Output) {
self.output = output
}
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber)
where Subscriber.Input == Output, Subscriber.Failure == Failure {
subscriber.receive(
subscription: Subscription(
subscriber: subscriber,
output: output
)
)
}
}
extension Always {
final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
where Subscriber.Input == Output {
private let subscriber: Subscriber
private let output: Output
private var cancelled = false
init(
subscriber: Subscriber,
output: Output
) {
self.subscriber = subscriber
self.output = output
}
func request(_ demand: Subscribers.Demand) {
var demand = demand
while !cancelled && demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
}
func cancel() { cancelled = true }
}
}
And of course this is totally a guess, but hope it helps!