Swift Async Algorithms Proposal: Relay (Generators Lite?)

Relay

Introduction

Swift's built in language features for asynchronous sequences provide a lightweight, ergonomic syntax for consuming elements from an asynchronous source, but creating those asynchronous sequences can sometimes feel a little more involved. AsyncStream works very well in its role of adapting traditional event sources into the world of structured concurrency, but there isn't an equivalent convenience for creating a natively asynchronous source.

For example, if we wanted to output the Fibonacci seqeunce asynchronously as an AsyncSequence, we'd need to create a type similar to the following:


// This could of course be implemented as a non-async `Sequence`, but serves
// well for illustrative purposes 
struct AsyncFibonacciSequence: AsyncSequence: Sendable {
  
  typealias Element = Int
  
  struct Iterator: AsyncIteratorProtocol {
    
    var seed = (0, 1)
    
    mutating func next() async -> Element? {
      if Task.isCancelled { return nil }
      defer { seed = (seed.1, seed.0 + seed.1) }
      return seed.0
    }
  }
  
  func makeAsyncIterator() -> Iterator {
    Iterator()
  }
}

let fibonacci = AsyncFibonacciSequence()

For simple routines, writing this amount of code creates unnecessary friction for programmers.

In addition, it's difficult to share the sequence amongst tasks. While the sequence does conform to Sendable, its iterator does not. This means that each time the sequence is iterated, it starts from the beginning. To circumvent this, a programmer may attempt to share an asynchronous sequence's iterator instead. But this would result in a compiler warning (and soon to be error) about attempting to send non-Sendable items across actor boundaries.

Proposed solution

Asynchronous relays work in a similar way to what are sometimes called 'generators' in other languages. They expose a convenient shorthand that makes creating a producing asynchronous sequence, nearly as frictionless as consuming an asynchronous sequence.

Here's how the Fibonacci asynchronous sequence above could be converted to an asynchronous relay with equivalent functionality:

let fibonacci = AsyncRelaySequence { yield in
  var seed = (0, 1)
  while !Task.isCancelled {
    await yield(seed.0)
    seed = (seed.1, seed.0 + seed.1)
  }
}

But often, it's desirable to share a producing iterator across Tasks. AsyncRelay faciliates this directly without a requirement to call AsyncRelaySequences makeAsyncIterator() method:


// Now just `AsyncRelay` instead of `AsyncRelaySequence`
let fibonacci = AsyncRelay { yield in 
  var seed = (0, 1)
  while !Task.isCancelled {
    await yield(seed.0)
    seed = (seed.1, seed.0 + seed.1)
  }
}

Task {
  let fib1 = await fibonacci.next()
  ...
}
Task {
  let fib2 = await fibonacci.next()
  ...
}
Task {
  let fib3 = await fibonacci.next()
  ...
}

AsyncRelay also has sibling throwing varieties, AsyncThrowingRelay and AsyncThrowingRelaySequence, which leverage Swift's built in control flow syntax to shutdown a relay when an Error is thrown.

let imageRequest1 = ...
let imageRequest2 = ...
let imageRequest3 = ...
let relay = AsyncThrowingRelay { yield in 
  await yield(try await imageRequest1.fetch()) // Good.
  await yield(try await imageRequest2.fetch()) // Throws! Relay will exit here and cancel.
  await yield(try await imageRequest3.fetch()) // Doesn't get called.
}

// Somewhere else is the code... 

do {
  let image1 = try await relay.next() // Great.
  let image2 = try await relay.next() // Uh-oh, Throws! 
  let image3 = try await relay.next() // Doesn't get called.
}
...

Detailed design

// An asynchronous sequence generated from a closure that limits its rate of
// element production to the rate of element consumption
//
// ``AsyncRelaySequence`` conforms to ``AsyncSequence``, providing a convenient
// way to create an asynchronous sequence without manually conforming a type
// ``AsyncSequence``.
//
// You initialize an ``AsyncRelaySequence`` with a closure that receives an
// ``AsyncRelay.Continuation``. Produce elements in this closure, then provide
// them to the sequence by calling the suspending continuation. Execution will
// resume as soon as the produced value is consumed. You call the continuation
// instance directly because it defines a `callAsFunction()` method that Swift
// calls when you call the instance. When there are no further elements to
// produce, simply allow the function to exit. This causes the sequence
// iterator to produce a nil, which terminates the sequence.
//
// Both ``AsyncRelaySequence`` and its iterator ``AsyncRelay`` conform to
// ``Sendable``, which permits them being called from from concurrent contexts.
public struct AsyncRelaySequence<Element: Sendable> : Sendable, AsyncSequence {
  public typealias AsyncIterator = AsyncRelay<Element>
  public init(_ producer: @escaping AsyncIterator.Producer)
  public func makeAsyncIterator() -> AsyncRelay<Element>
}

// An asynchronous sequence iterator generated from a closure that limits its
// rate of element production to the rate of element consumption
//
// For usage information see ``AsyncRelaySequence``.
//
// ``AsyncRelay`` conforms to ``Sendable``, which permits calling it from
// concurrent contexts.
public struct AsyncRelay<Element: Sendable> : Sendable, AsyncIteratorProtocol {
  
  public typealias Producer = @Sendable (Continuation) async -> Void
  
  public struct Continuation {    
    public func callAsFunction(_ element: Element) async
  }  
  public func next() async -> Element?
}

// A throwing asynchronous sequence generated from a closure that limits its
// rate of element production to the rate of element consumption
//
// ``AsyncThrowingRelaySequence`` conforms to ``AsyncSequence``, providing a
// convenient way to create a throwing asynchronous sequence without manually
// conforming a type ``AsyncSequence``.
//
// You initialize an ``AsyncThrowingRelaySequence`` with a closure that
// receives an ``AsyncThrowingRelay.Continuation``. Produce elements in this
// closure, then provide them to the sequence by calling the suspending
// continuation. Execution will resume as soon as the value produced is
// consumed. You call the continuation instance directly because it defines a
// `callAsFunction()` method that Swift calls when you call the instance. When
// there are no further elements to produce, simply allow the function to exit.
// This causes the sequence to produce a nil, which terminates the sequence.
// You may also choose to throw from within the closure which terminates the
// sequence with an ``Error``.
//
// Both ``AsyncThrowingRelaySequence`` and its iterator ``AsyncThrowingRelay``
// conform to ``Sendable``, which permits them being called from from
// concurrent contexts.
public struct AsyncThrowingRelaySequence<Element: Sendable> : Sendable, AsyncSequence {
  
  public typealias AsyncIterator = AsyncThrowingRelay<Element>  
  public init(_ producer: @escaping AsyncIterator.Producer)  
  public func makeAsyncIterator() -> AsyncThrowingRelay<Element>
}

// A throwing asynchronous sequence iterator generated from a closure that
// limits its rate of element production to the rate of element consumption
//
// For usage information see ``AsyncThrowingRelaySequence``.
//
// ``AsyncThrowingRelay`` conforms to ``Sendable``, which permits calling it
// from concurrent contexts.
public struct AsyncThrowingRelay<Element: Sendable> : Sendable, AsyncIteratorProtocol {
  public typealias Producer = @Sendable (Continuation) async throws -> Void
  public struct Continuation {
    public func callAsFunction(_ element: Element) async
  }
  public init(_ producer: @escaping Producer)
  public func next() async throws -> Element?
}

Acknowledgmenets

Asynchronous relays are heavily inspired by generators and sequence builders available in other languages.

3 Likes

Hey @tcldr

I'm not sure I understand the key differences with AsyncStream. Could you explain please ?

Thanks.

Hi @twittemb. Yes, of course!

The big difference is that it supports back pressure. Although AsyncStream has an initialiser that (I think) supports back pressure (via an unfolding () async -> Element? closure), it can't hold any state. So in practice it's quite limited.

The AsyncStream initialiser which takes a closure and supplies a continuation is quite flexible, but it does not support back pressure. That means that production can run way ahead of consumption, and in the Fibonacci example you'd end up overrunning the buffer pretty quickly.

AsyncStream { continuation in
  var seed = (0, 1)
  while !Task.isCancelled {
    continuation.yield(seed.0) // we'll NEVER stop here
    seed = (seed.1, seed.0 + seed.1)
  }
}

So with AsyncRelay you get a kind of, well, relay between Tasks! It takes the following sequence:

  1. The first consumer calls next this kicks off the inner Task, and with it, the user supplied closure. The consumer Task is immediately suspended.
  2. The inner Task arrives at its first yield. When it calls yield, It passes through an element for the awaiting next consumer and suspends. This resumes the awaiting next who can return the element to the awaiting consumer Task.
  3. After processing the previous element, the consumer Task calls next again and suspends, this resumes the awaiting inner Task stuck at yield, who goes on to produce the next element.
  4. The inner Task arrives at its next yield. It passes through an element and suspends. This resumes the awaiting consumer Task. And so on, back to '3' ad infinitum, until.
  5. The sequence ends, or the relay is deallocated, cancelling the inner Task

Other differences:

  • AsyncRelay is lazy, the inner Task is spawned only upon the first call to next, whereas with AsyncStream its kicked off immediately.
  • AsyncRelaySequence can be iterated multiple times, and each consumer gets their own fresh iterator.
  • The iterator, AsyncRelay, can be consumed from multiple Tasks concurrently.
  • The syntax is a bit more succinct. AsyncRelay allows you to just exit the closure scope to finish, rather than calling continuation.finish(), and AsyncThrowingRelay allows you to throw to exit (and return an Error to an awaiting consumer), rather than calling continuation.finish(throwing:).

Does that explain it a bit better?

It's not a replacement for AsyncStream, just an asynchronous dual to it.

Thanks for the insight.

From my understanding it is very similar to flow builder.

I understand the need. My only concern so far would be the blurry lines from a developer perspective between AsyncStream, AsyncRelay and eventually AsyncChannel. At some point it can become a bit confusing and unclear what tool matches the need, back pressure and closure execution wise.

1 Like

Yeah, and I think I've heard the term 'sequence builder' from Kotlin, and perhaps generators elsewhere.

I think there's talk of creating this as a language feature, too. But that's why I think it could have quite a good home here in the Async Algorithms package, so that people have something available to them in the interim.

In my opinion I think that is something we should go after - it seems that is the direction a number of designs have alluded to and it truthfully feels like a larger feature than just a singular interface in a library. Additionally items we pitch for libraries should be considered as they may live indefinitely - it would be a shame to pour work into building something and then have it be just replaced in a short while with a more elegant language level solution...

1 Like

Sure, that makes sense. I've closed the PR. I'll leave my branch up for anyone that might benefit from an interim solution.