sharplet
(Adam Sharp)
1
Consider a hypothetical publisher Always<Output>:
public struct Always<Output>: Publisher {
public typealias Failure = Never
public init(_ output: Output)
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}
It's a lot like Just, except that instead of producing a single output and then completing, it continues to produce the same value as long as there is demand.
I took a stab at implementing it. Here's a snippet from the internal Subscription class:
extension Always.Subscription: Subscriber {
func request(_ demand: Subscribers.Demand)
// assume an instance variable `subscriber`
var demand = demand
while demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
}
}
If I attach to this publisher using sink, this code results in an endless loop, as expected. However, even this code results in an infinite loop:
// Expected: `first` should request a single value then complete
// Actual: `first` forwards the unlimited demand, resulting in an endless loop!
Always(1).first().sink { value in
print(value)
}
Curiously, if I simulate this publisher using Publishers.Sequence, everything works just fine:
let ones = sequence(first: 1) { $0 }
ones.publisher.first().sink { value in
print(value)
}
How does Publishers.Sequence achieve this? What's the mistake in my implementation of Always.Subscription, and more generally, how should a synchronous publisher be implemented such that it respects demand?
Maybe ones.publisher provides a custom implementation of first()? How does ones.publisher.eraseToAnyPublisher().first() behave?
jasdev
(Jasdev Singh)
3
Tacking on a print operator to the ones example gives a possible hint.
let ones = sequence(first: 1) { $0 }
ones
.publisher
.print()
.first()
.sink { value in
print(value)
}
Logs
receive subscription: (Sequence)
request unlimited
receive value: (1)
receive cancel
1
Unlimited demand is requested in both situations. But, after that first value is received, a finished event is received downstream, which then propagates cancellations back up.
Pointing to Always.Subscription’s cancel method as a possible route to break out of the loop. Here’s a rough sketch that restores the expected behavior.
public struct Always<Output>: Publisher {
public typealias Failure = Never
private let output: Output
public init(_ output: Output) {
self.output = output
}
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber)
where Subscriber.Input == Output, Subscriber.Failure == Failure {
subscriber.receive(
subscription: Subscription(
subscriber: subscriber,
output: output
)
)
}
}
extension Always {
final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
where Subscriber.Input == Output {
private let subscriber: Subscriber
private let output: Output
private var cancelled = false
init(
subscriber: Subscriber,
output: Output
) {
self.subscriber = subscriber
self.output = output
}
func request(_ demand: Subscribers.Demand) {
var demand = demand
while !cancelled && demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
}
func cancel() { cancelled = true }
}
}
And of course this is totally a guess, but hope it helps!
2 Likes
sharplet
(Adam Sharp)
4
@gwendal.roue Good thought, but unfortunately no difference in behaviour when using eraseToAnyPublisher().first().
@jasdev I think you might be on the money there!
Here's the final version: instead of tracking a separate isCancelled property, I've made the subscriber optional so it can be set to nil when cancelled. A Combine publisher that repeatedly emits the same value as long as there is demand · GitHub
extension Always.Subscription: Cancellable {
func cancel() {
subscriber = nil
}
}
extension Always.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
var demand = demand
while let subscriber = subscriber, demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
}
}
3 Likes
sharplet
(Adam Sharp)
5
(grumbles) freakin’ reentrancy
1 Like