Reactive style programming with `for await`

I am in the process of looking into the refactoring opportunities of using many of the excellent new concurrency features in an app that uses a reactive style of programming.

Here many of the signals (async sequences) are basically indefinite - emitting new values as data changes on the backend.

I was hoping to be able to eliminate some of the 'manual' life cycle management where the subscription to a signal is stored in a cancellable/disposable and this cancellable has it's lifetime managed by the lifetime of the consumer of the signal (for instance some kind of view model).

My approach is something like this playground:

import Foundation
import _Concurrency
import PlaygroundSupport

func stream() -> AsyncStream<Int> {
  AsyncStream { continuation in
    continuation.yield(10)
    Task {
      try await Task.sleep(nanoseconds: 1_000_000_000)
      continuation.yield(20)
      try await Task.sleep(nanoseconds: 1_000_000_000)
      continuation.yield(30)
      try await Task.sleep(nanoseconds: 1_000_000_000)
      continuation.finish()
    }
  }
}

@MainActor
class Bar {
  func foo() async {
    for await a in stream() {
      print(a)
    }
  }
  deinit {
    print("DEINIT")
  }
}

Task {
  await Bar().foo()
}

PlaygroundPage.current.needsIndefiniteExecution = true

Where Bar takes the role of some view model and stream() represents an indefinite stream of values.

The output is:

10
20
30
DEINIT

and if I never finish the stream, no DEINIT is output.

I think that I was somehow expecting that the for await loop would be broken by releasing Bar, but as I can see in the playground, the loop continues for as long as there are new values in the stream. If the stream never ends my for loop will never end - as far as I can tell.

Is the answer to not use indefinite for await loops directly, but instead wrap the loops in a Task in order to support cancellation?
If so, I have a follow-up question about that, but I'd like to hear if I am making any wrong observations in this little experiment first.

1 Like

That is one approach; putting the work into a task allows you to cancel it. Which that will terminate the iteration. Another approach is to truncate the output via an operation like .prefix(3) which will give you a prefix of that sequence that is 3 elements and then it will no longer iterate.

What you are seeing is actually just a general issue with any async function - if it was just an async function that ran indefinitely that would have the same "holding onto Bar".

Now one could mix the task creation and the iteration together and get something like this:

extension AsyncSequence {
  func forEachResult(_ apply: @escaping @Sendable (Result<Element, Error>) async -> Void)) -> Task<Void, Never> {
    Task {
      do {
        for try await item in self {
          await apply(.success(item))
        }
      } catch {
        await apply(.failure(error))
      }
    }
  }
}

There are a number of edges here to consider; you still have capture semantics, you have a resource per iteration, if the task is not cancelled after use it will still potentially run if the AsyncSequence is indefinite, the AsyncSequence should be Sendable in this case. etc.

2 Likes