Synchronous Publishers: How do they work?

Tacking on a print operator to the ones example gives a possible hint.

let ones = sequence(first: 1) { $0 }
ones
	.publisher
	.print()
	.first()
	.sink { value in

	print(value)
}

Logs

receive subscription: (Sequence)
request unlimited
receive value: (1)
receive cancel
1

Unlimited demand is requested in both situations. But, after that first value is received, a finished event is received downstream, which then propagates cancellations back up.

Pointing to Always.Subscription’s cancel method as a possible route to break out of the loop. Here’s a rough sketch that restores the expected behavior.

public struct Always<Output>: Publisher {
	public typealias Failure = Never

	private let output: Output

	public init(_ output: Output) {
		self.output = output
	}

	public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber)
		where Subscriber.Input == Output, Subscriber.Failure == Failure {

		subscriber.receive(
			subscription: Subscription(
				subscriber: subscriber,
				output: output
			)
		)
	}
}

extension Always {
	final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
		where Subscriber.Input == Output {

		private let subscriber: Subscriber
		private let output: Output

		private var cancelled = false

		init(
			subscriber: Subscriber,
			output: Output
		) {
			self.subscriber = subscriber
			self.output = output
		}

		func request(_ demand: Subscribers.Demand) {
			var demand = demand

			while !cancelled && demand > 0 {
				demand -= 1
				demand += subscriber.receive(output)
			}
		}

		func cancel() { cancelled = true }
	}
}

And of course this is totally a guess, but hope it helps!

2 Likes