AsyncStream and progress reporting

Hello!
I have a class called Generator that generates an async String. I would like to implement a progress: AsyncStream<Double> property. Almost all the code I've seen has the generating sequence inside the continuation closure, but this is not something possible in this case.

I've managed to write something that works, but it's maybe the totally wrong way to do so. Right now, I have:

class Generator {
  func generate() async -> String {
    for i in 0...100 {
      onProgress(Double(i))
      await Task.sleep(20_000_000)
    }
    onFinish()
    return "OK!"
  }
  
  private var onProgress: (Double) -> Void = { _ in () }
  private var onFinish: () -> Void = { }
  
  lazy var progress = AsyncStream<Double> { continuation in
    onProgress = { value in
      continuation.yield(value)
    }
    onFinish = {
      continuation.finish()
    }
  }
}

Am I doing something wrong somewhere?
Is AsyncStream the correct type to use?
Should I cleanup the callback closures at the end of generate?
Should I vend a new value each time the stream property is accessed instead of lazy-instantiating it? In this case, how should I manage the closures?

Thank you!

2 Likes

The continuation (unlike other continuations) for AsyncStream is designed to be escaped so you can even store that continuation in a type just fine. This code looks ok to me for cycles and such. AsyncStream seems reasonable here to me, it more so depends on your external usages of it, but this seems ok; except perhaps you might want to vend a new AsyncStream per access so they are not shared. in that case it might be better to store the continuation instead of turning it around like the way you have w/ the closures.

1 Like

Thanks! That's great news for the continuation as it was my main doubt. That's exactly why I was sharing the AsyncStream but if I can store the continuations in some collection like [UUID: Continuation<Double>], I can then bypass the closures and vend a new AsyncStream each time.

Thanks again for your response!

Thanks for the info. Why might one want to vend a new AsyncStream per access? What are the tradeoffs of sharing or not sharing a stream?

well sharing streams would be consuming to each others buffer of values; so AsyncStream asserts that no two tasks can be awaiting the same one at the same time - else there would be undefined behavior/races on that value.

3 Likes

In the continuation closure, I'm now generating a UUID and I'm storing the continuation in a dictionary.
I would also like the continuation to be discarded when it's not needed anymore. This can be done from the generator side when work is finished, but I would also want to handle cases where the client of the AsyncStream iteration goes away. As I understand it, I would need to set the onTermination closure of the continuation where I could remove the continuation from the dictionary.

Trying to assign anything (even { _ in }) to continuation.onTermination generates an error:

Converting non-concurrent function value to '@Sendable (AsyncStream<Double>.Continuation.Termination) -> Void' may introduce data races

I've tried to copy/paste verbatim the code at the end of the proposal, and it generates the same error:

let t = Task.detached {
  func make123Stream() -> AsyncStream<Int> {
    AsyncStream { continuation in
      continuation.onTermination = { termination in  //<--Error
        switch termination { 
        case .finished:
            print("Regular finish")
        case .cancelled:
            print("Cancellation")
        }
      }
      Task.detached {
        for n in 1...3 {
          continuation.yield(n)
          sleep(2)
        }
        continuation.finish()
      }
    }
  }

Does the API changed as of Xcode 13b3? Is this a temporary issue with the current implementation?

I get it now. The onTermination closure needs to be annotated with @Sendable:

continuation.onTermination = { @Sendable termination in … }
1 Like