I know you're not supposed to write Publishers
in Combine
, but my idea was to better understand how they work by writing one.
This publishers should infinitely yield integers, incremented by two from a start point:
startPoint: 4
yields: 4,6,8....
What do you think of my approach?:
struct ByTwo {
var startPoint = 0
var isDone = false
init(from startPoint: Int) {
self.startPoint = startPoint
}
init() { }
}
extension ByTwo: Sequence, IteratorProtocol {
mutating func next() -> Int? {
if isDone {
return nil
}
defer { startPoint += 2 }
return startPoint
}
}
extension ByTwo: Publisher {
typealias Output = Int
typealias Failure = Never
func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Int == S.Input {
let subscription = ByTwoSubscription(sequence: self, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
}
class ByTwoSubscription: Subscription {
private var sequence: any Sequence<Int>
private lazy var runningTask = Task { [unowned self] in
for i in sequence {
while self.shouldPasue {
await Task.yield()
}
switch self.counter {
case 0:
return
case -1:
break;
default:
self.counter -= 1
}
self.subscriber.receive(i)
}
}
private let subscriber: any Subscriber<Int, Never>
private var counter = -1
private var shouldPasue = true
init(sequence: some Sequence<Int>, subscriber: some Subscriber<Int, Never>) {
self.sequence = sequence
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
print("here")
if demand == .unlimited {
shouldPasue = false
runningTask
}
if let mx = demand.max {
shouldPasue = false
counter = mx
}
if demand == .none {
shouldPasue = true
}
}
func cancel() {
runningTask.cancel()
}
}
You might notice that the isDone
property of the publisher is not used... this is because in the end, I needed to put more logic in the subscription
, rather than the publisher, simply because I have no idea, once a demand request comes in, how to communicate back to the publisher