AsyncStream with Combine

Hi,

Overview

I have an AsyncStream that uses a RandomNumberGenerator.

RandomNumberGenerator uses a publisher to publish random numbers

While creating an AsyncStream and subscribing to the random generator's publisher I wasn't sure where to store the cancellable returned by the sink

Problem

If store the cancellable inside the AsyncStream's initialiser then it gets cancelled immediately since cancellable will be destroyed

Workaround

I have declared canceller outside the stream which works but doesn't seem like the right way

Question

Is there a better way to do this (without using closures, would prefer to use publisher if possible, otherwise let me know if there is no way out)?

Code

import Foundation
import Combine

//Keeping the canceller inside AsyncStream initialiser closure would cause the publisher to be cancelled
//Workaround: I have declared canceller outside the stream which works but doesn't seem like the right way
var canceller: AnyCancellable?

let stream = AsyncStream<Int> { continuation in
    let generator = RandomNumberGenerator()
    
    //If canceller was declared as a local variable, the publisher would be cancelled immediately as the canceller variable would go out of scope
    canceller = generator.$newValue
        .compactMap { $0 }
        .sink { newValue in
            continuation.yield(newValue)
        }
    
    continuation.onTermination = { continuation in
        generator.stop()
    }

    generator.start()
}

class RandomNumberGenerator {
    
    var newValueHandler: ((Int) -> ())?
    @Published var newValue = 0
    
    private var timer: Timer?
    
    func start() {
        timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { [weak self] timer in
            guard let randomNumber = Self.generate() else {
                return
            }
            self?.newValue = randomNumber
            self?.newValueHandler?(randomNumber)
        }
    }
    
    func stop() {
        timer?.invalidate()
        timer = nil
    }
    
    private static func generate() -> Int? {
        (1000..<9999).randomElement()
    }
}


for await value in stream {
    print(value)
}

RunLoop.main.run()

You can capture the canceller in the onTermination closure in order to guarantee that it lives for as long as the stream:

let canceller = ...

continuation.onTermination = { continuation in
  _ = canceller
  ...
}
2 Likes

@mbrandonw wow ... never occurred to me, thanks a lot!!

From your suggestion, I suppose just referring to it in the onTermination prevents it from expiring.

Based on your suggestion I thought the following also could be done:

continuation.onTermination = { continuation in
    generator.stop()
    canceller.cancel()
}

Yeah, that's a good idea to call canceller.cancel() in there. Basically, onTermination should be used to capture resources you want to live for as long as the stream, and to clean up resources once the stream completes or is cancelled.

2 Likes
import AsyncAlgorithms

let randomNumbers = AsyncTimerSequence(interval: .seconds(1), clock: .suspending).compactMap { _ in (1000..<9999).randomElement() }

for await value in randomNumbers {
  print(value)
}

RunLoop.main.run()

That should be isomorphic to what you are doing btw; except that it wouldn't need to buffer values.

1 Like

Thanks a lot @Philippe_Hausler

Is it part of GitHub - apple/swift-async-algorithms: Async Algorithms for Swift ?

I only knew swift-algorithms package existed, didn't know that swift-async-algorithms also exists!

Is there any WWDC video that goes through this?

Yep; AsyncTimerSequence is from there.

1 Like