Synchronous Publishers: How do they work?

Consider a hypothetical publisher Always<Output>:

public struct Always<Output>: Publisher {
  public typealias Failure = Never

  public init(_ output: Output)

  public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}

It's a lot like Just, except that instead of producing a single output and then completing, it continues to produce the same value as long as there is demand.

I took a stab at implementing it. Here's a snippet from the internal Subscription class:

extension Always.Subscription: Subscriber {
  func request(_ demand: Subscribers.Demand) 
    // assume an instance variable `subscriber`
    var demand = demand
    while demand > 0 {
      demand -= 1
      demand += subscriber.receive(output)
    }
  }
}

If I attach to this publisher using sink, this code results in an endless loop, as expected. However, even this code results in an infinite loop:

// Expected: `first` should request a single value then complete
// Actual: `first` forwards the unlimited demand, resulting in an endless loop!
Always(1).first().sink { value in
  print(value)
}

Curiously, if I simulate this publisher using Publishers.Sequence, everything works just fine:

let ones = sequence(first: 1) { $0 }
ones.publisher.first().sink { value in
  print(value)
}

How does Publishers.Sequence achieve this? What's the mistake in my implementation of Always.Subscription, and more generally, how should a synchronous publisher be implemented such that it respects demand?

Maybe ones.publisher provides a custom implementation of first()? How does ones.publisher.eraseToAnyPublisher().first() behave?

Tacking on a print operator to the ones example gives a possible hint.

let ones = sequence(first: 1) { $0 }
ones
	.publisher
	.print()
	.first()
	.sink { value in

	print(value)
}

Logs

receive subscription: (Sequence)
request unlimited
receive value: (1)
receive cancel
1

Unlimited demand is requested in both situations. But, after that first value is received, a finished event is received downstream, which then propagates cancellations back up.

Pointing to Always.Subscription’s cancel method as a possible route to break out of the loop. Here’s a rough sketch that restores the expected behavior.

public struct Always<Output>: Publisher {
	public typealias Failure = Never

	private let output: Output

	public init(_ output: Output) {
		self.output = output
	}

	public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber)
		where Subscriber.Input == Output, Subscriber.Failure == Failure {

		subscriber.receive(
			subscription: Subscription(
				subscriber: subscriber,
				output: output
			)
		)
	}
}

extension Always {
	final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
		where Subscriber.Input == Output {

		private let subscriber: Subscriber
		private let output: Output

		private var cancelled = false

		init(
			subscriber: Subscriber,
			output: Output
		) {
			self.subscriber = subscriber
			self.output = output
		}

		func request(_ demand: Subscribers.Demand) {
			var demand = demand

			while !cancelled && demand > 0 {
				demand -= 1
				demand += subscriber.receive(output)
			}
		}

		func cancel() { cancelled = true }
	}
}

And of course this is totally a guess, but hope it helps!

2 Likes

@gwendal.roue Good thought, but unfortunately no difference in behaviour when using eraseToAnyPublisher().first().

@jasdev I think you might be on the money there!

Here's the final version: instead of tracking a separate isCancelled property, I've made the subscriber optional so it can be set to nil when cancelled. https://gist.github.com/sharplet/ddf2debb7eccff40d307027f4c6d9f0c

extension Always.Subscription: Cancellable {
  func cancel() {
    subscriber = nil
  }
}

extension Always.Subscription: Subscription {
  func request(_ demand: Subscribers.Demand) {
    var demand = demand
    while let subscriber = subscriber, demand > 0 {
      demand -= 1
      demand += subscriber.receive(output)
    }
  }
}
3 Likes

(grumbles) freakin’ reentrancy

1 Like
Terms of Service

Privacy Policy

Cookie Policy