somu
(somu)
1
Hi,
Overview
I have an AsyncStream that uses a RandomNumberGenerator.
RandomNumberGenerator uses a publisher to publish random numbers
While creating an AsyncStream and subscribing to the random generator's publisher I wasn't sure where to store the cancellable returned by the sink
Problem
If store the cancellable inside the AsyncStream's initialiser then it gets cancelled immediately since cancellable will be destroyed
Workaround
I have declared canceller outside the stream which works but doesn't seem like the right way
Question
Is there a better way to do this (without using closures, would prefer to use publisher if possible, otherwise let me know if there is no way out)?
Code
import Foundation
import Combine
//Keeping the canceller inside AsyncStream initialiser closure would cause the publisher to be cancelled
//Workaround: I have declared canceller outside the stream which works but doesn't seem like the right way
var canceller: AnyCancellable?
let stream = AsyncStream<Int> { continuation in
let generator = RandomNumberGenerator()
//If canceller was declared as a local variable, the publisher would be cancelled immediately as the canceller variable would go out of scope
canceller = generator.$newValue
.compactMap { $0 }
.sink { newValue in
continuation.yield(newValue)
}
continuation.onTermination = { continuation in
generator.stop()
}
generator.start()
}
class RandomNumberGenerator {
var newValueHandler: ((Int) -> ())?
@Published var newValue = 0
private var timer: Timer?
func start() {
timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { [weak self] timer in
guard let randomNumber = Self.generate() else {
return
}
self?.newValue = randomNumber
self?.newValueHandler?(randomNumber)
}
}
func stop() {
timer?.invalidate()
timer = nil
}
private static func generate() -> Int? {
(1000..<9999).randomElement()
}
}
for await value in stream {
print(value)
}
RunLoop.main.run()
mbrandonw
(Brandon Williams)
2
You can capture the canceller in the onTermination closure in order to guarantee that it lives for as long as the stream:
let canceller = ...
continuation.onTermination = { continuation in
_ = canceller
...
}
2 Likes
somu
(somu)
3
@mbrandonw wow ... never occurred to me, thanks a lot!!
From your suggestion, I suppose just referring to it in the onTermination prevents it from expiring.
Based on your suggestion I thought the following also could be done:
continuation.onTermination = { continuation in
generator.stop()
canceller.cancel()
}
mbrandonw
(Brandon Williams)
4
Yeah, that's a good idea to call canceller.cancel() in there. Basically, onTermination should be used to capture resources you want to live for as long as the stream, and to clean up resources once the stream completes or is cancelled.
2 Likes
import AsyncAlgorithms
let randomNumbers = AsyncTimerSequence(interval: .seconds(1), clock: .suspending).compactMap { _ in (1000..<9999).randomElement() }
for await value in randomNumbers {
print(value)
}
RunLoop.main.run()
That should be isomorphic to what you are doing btw; except that it wouldn't need to buffer values.
1 Like
somu
(somu)
6
Thanks a lot @Philippe_Hausler
Is it part of GitHub - apple/swift-async-algorithms: Async Algorithms for Swift ?
I only knew swift-algorithms package existed, didn't know that swift-async-algorithms also exists!
Is there any WWDC video that goes through this?
Yep; AsyncTimerSequence is from there.
1 Like