danmihai
(Dan)
1
I know you're not supposed to write Publishers in Combine, but my idea was to better understand how they work by writing one.
This publishers should infinitely yield integers, incremented by two from a start point:
startPoint: 4
yields: 4,6,8....
What do you think of my approach?:
struct ByTwo {
var startPoint = 0
var isDone = false
init(from startPoint: Int) {
self.startPoint = startPoint
}
init() { }
}
extension ByTwo: Sequence, IteratorProtocol {
mutating func next() -> Int? {
if isDone {
return nil
}
defer { startPoint += 2 }
return startPoint
}
}
extension ByTwo: Publisher {
typealias Output = Int
typealias Failure = Never
func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Int == S.Input {
let subscription = ByTwoSubscription(sequence: self, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
}
class ByTwoSubscription: Subscription {
private var sequence: any Sequence<Int>
private lazy var runningTask = Task { [unowned self] in
for i in sequence {
while self.shouldPasue {
await Task.yield()
}
switch self.counter {
case 0:
return
case -1:
break;
default:
self.counter -= 1
}
self.subscriber.receive(i)
}
}
private let subscriber: any Subscriber<Int, Never>
private var counter = -1
private var shouldPasue = true
init(sequence: some Sequence<Int>, subscriber: some Subscriber<Int, Never>) {
self.sequence = sequence
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
print("here")
if demand == .unlimited {
shouldPasue = false
runningTask
}
if let mx = demand.max {
shouldPasue = false
counter = mx
}
if demand == .none {
shouldPasue = true
}
}
func cancel() {
runningTask.cancel()
}
}
You might notice that the isDone property of the publisher is not used... this is because in the end, I needed to put more logic in the subscription, rather than the publisher, simply because I have no idea, once a demand request comes in, how to communicate back to the publisher