[Pitch] AsyncStream and AsyncThrowingStream

AsyncStream and AsyncThrowingStream

Introduction

The continuation types added in SE-0300 act as adaptors for synchronous code that signals completion by calling a delegate method or callback function. For code that instead yields multiple values over time, this proposal adds new types to support implementing an AsyncSequence interface.

Motivation

Swift’s new async/await features include support for adapting callback- or delegate-based APIs using UnsafeContinuation or CheckedContinuation. These single-use continuations work well for adapting APIs like the getInt(completion:) function defined here into asynchronous ones:

func getInt(completion: @escaping (Int) -> Void) {
    DispatchQueue(label: "myQueue").async {
        sleep(1)
        completion(42)
    }
}

By calling one of the with``*Continuation(_:) functions, you can suspend the current task and resume it with a result or an error using the provided continuation.

func getInt() async -> Int {
    await withUnsafeContinuation { continuation in
        getInt(completion: { result in
            continuation.resume(returning: result)
        }
    }
}

This provides a great experience for APIs that asynchronously produce a single result, but some operations produce many values over time instead. Rather than being adapted to an async function, the appropriate solution for these operations is to create an AsyncSequence.

Repeating asynchronous operations typically separate the consumption of values from their location of use, either within a callback function or a delegate method. For example, when creating a DispatchSourceSignal, the values are processed in the source’s event handler closure:

let source = DispatchSource.makeSignalSource(signal: SIGINT)
source.setEventHandler {
    // do something with source.data
}
source.resume()

The same is true for delegates that are informative only, and need no feedback or exclusivity of execution to be valid. As one example, the AppKit NSSpeechRecognizerDelegate is called whenever the system recognizes a spoken command:

if let recognizer = NSSpeechRecognizer() {
    class CommandDelegate: NSObject, NSSpeechRecognizerDelegate {
        func speechRecognizer(
             _ sender: NSSpeechRecognizer,
            didRecognizeCommand command: String) 
        {
            // do something on each recognized command
        }
    }
    let delegate = CommandDelegate()
    recognizer.delegate = delegate
    ...
}

Both of these examples represent common design patterns in many applications, frameworks, and libraries. These delegate methods and callbacks are asynchronous in nature, but cannot be annotated as async functions because they don’t have a singular return value. While not all delegates or callbacks are suitable to be represented as asynchronous sequences, there are numerous enough cases that we would like to offer a safe and expressive way to represent producers that can yield multiple values or errors.

Proposed Solution

In order to fill this gap, we propose adding two new types: AsyncStream and AsyncThrowingStream. These types fill a role similar to continuations, bridging the gap from non async/await based asynchronous behavior into the world of async/await. We anticipate that types that currently provide multiple-callback or delegate interfaces to asynchronous behavior can use these AsyncStream types to provide an interface for within async contexts.

The two AsyncStream types each include a nested Continuation type; these outer and inner types represent the consuming and producing sides of operation, respectively. You send values, errors, and “finish” events via the Continuation, and clients consume those values and errors through the AsyncStream type’s AsyncSequence interface.

Creating a non-throwing AsyncStream

When you create an AsyncStream instance, you specify the element type and pass a closure that operates on the series’s Continuation. You can yield values to this continuation type multiple times, and the series buffers any yielded elements until they are consumed via iteration.

The DispatchSourceSignal above can be given an AsyncStream interface this way:

extension DispatchSource {
    static func signals(_ signal: Int32) -> AsyncStream<UInt> {
        AsyncStream { continuation in
            let source = DispatchSource.makeSignalSource(signal: signal)
            source.setEventHandler {
                continuation.yield(source.data)
            }
            source.resume()
        }
    }
}

// elsewhere...

for await signal in DispatchSource.signals(SIGINT).prefix(while: { ... }) {
    // ...
}

As each value is passed to the DispatchSourceSignal’s event handler closure, the call to continuation.yield(_:) stores the value for access by a consumer of the sequence. With this implementation, signal events are buffered as they come in, and only consumed when an iterator requests a value.

Creating an AsyncThrowingStream

Along with the potentially infinite sequence in the example above, AsyncSeries can also adapt APIs like the slightly contrived one below. The findVegetables function uses callback closures that are called with each retrieved vegetable, as well as when the vegetables have all been returned or an error occurs.

func buyVegetables(
  shoppingList: [String],

  // a) invoked once for each vegetable in the shopping list
  onGotVegetable: (Vegetable) → Void,

  // b) invoked once all available veggies have been retrieved
  onAllVegetablesFound: () → Void,

  // c) invoked if a non-vegetable food item is encountered
  // in the shopping list
  onNonVegetable: (Error) → Void
)

// Returns a stream of veggies
func findVegetables(shoppingList: [String]) -> AsyncThrowingStream<Vegetable> {
  AsyncThrowingStream { continuation in
    buyVegetables(
      shoppingList: shoppingList,
      onGotVegetable: { veggie in continuation.yield(veggie) },
      onAllVegetablesFound: { continuation.finish() },
      onNonVegetable: { error in continuation.finish(throwing: error) }
    )
  }
}

Note that a call to the finish() method is required to end iteration for the consumer of an AsyncStream. Any buffered elements are provided to the sequence consumer before finishing with either a simple terminating nil or a thrown error.

Awaiting Values

An AsyncStream provides an AsyncSequence interface to its values, so you can iterate over the elements in an AsyncStream by using for-in, or use any of the AsyncSequence methods added as part of SE-0298.

for await notif in NotificationCenter
      .notifications(for: ...)
      .prefix(3) 
{
    // update with notif
}

You may also create an iterator directly, and call its next() method for more control over iteration. Each call to next(), either through for-in iteration, an AsyncSequence method, or direct calls on an iterator, either immediately returns the earliest buffered element or awaits the next element yielded to the stream’s continuation.

As with any sequence, iterating over an AsyncStream multiple times, or creating multiple iterators and iterating over them separately, may produce an unexpected series of values. Neither AsyncStream nor its iterator are @Sendable types, and concurrent iteration is considered a programmer error.

Detailed design

The full API of AsyncStream, AsyncThrowingStream, and their nested Continuation and Iterator types are as follows:

/// An ordered, asynchronously generated sequence of elements.
///
/// AsyncStream is an interface type to adapt from code producing values to an
/// asynchronous context iterating them. This is intended to allow
/// callback or delegation based APIs to participate with async/await.
///
/// AsyncStream can be initialized with the option to buffer to a given limit.
/// The default value for this limit is Int.max. The buffering is only for
/// values that have yet to be consumed by iteration. Values can be yielded 
/// to the continuation passed into the closure. That continuation
/// is Sendable, in that it is intended to be used from concurrent contexts
/// external to the iteration of the stream.
///
/// A trivial use case producing values from a detached task would work as such:
///
///     let digits = AsyncStream(Int.self) { continuation in
///       detach {
///         for digit in 0..<10 {
///           continuation.resume(yielding: digit)
///         }
///         continuation.finish()
///       }
///     }
///
///     for await digit in digits {
///       print(digit)
///     }
///
public struct AsyncStream<Element> {
  public struct Continuation: Sendable {
    public enum Termination {
      case finished
      case cancelled
    }
    
    /// Resume the task awaiting the next iteration point by having it return
    /// nomally from its suspension point. Buffer the value if nothing is awaiting
    /// the iterator.
    ///
    /// - Parameter value: The value to yield from the continuation.
    ///
    /// This can be called more than once and returns to the caller immediately
    /// without blocking for any awaiting consumption from the iteration.
    public func yield(_ value: Element)

    /// Resume the task awaiting the next iteration point by having it return
    /// nil. This signifies the end of the iteration.
    ///
    /// Calling this function more than once is idempotent. All values received
    /// from the iterator after it is finished and after the buffer is exhausted 
    /// are nil.
    public func finish()

    /// A callback to invoke when iteration of a AsyncStream is canceled.
    ///
    /// If an onTermination callback is set, when iteration of an AsyncStream is
    /// canceled via task cancellation that callback is invoked. The callback
    /// is disposed of after any terminal state is reached.
    ///
    /// Canceling an active iteration will first invoke the onCancel callback
    /// and then resume yielding nil. This means that any cleanup state can be
    /// emitted accordingly in the cancellation handler.
    public **var** onTermination: (@Sendable (Termination) -> Void)? { get nonmutating set }
  }

  /// Construct an AsyncStream buffering given an Element type.
  ///
  /// - Parameter elementType: The type the AsyncStream will produce.
  /// - Parameter maxBufferedElements: The maximum number of elements to
  ///   hold in the buffer. A value of 0 results in no buffering.
  /// - Parameter build: The work associated with yielding values to the 
  ///   AsyncStream.
  ///
  /// The maximum number of pending elements is enforced by dropping the oldest
  /// value when a new value comes in. By default this limit is unlimited.
  /// A value of 0 results in immediate dropping of values if there is no current
  /// await on the iterator.
  ///
  /// The build closure passes in a Continuation which can be used in
  /// concurrent contexts. It is thread safe to yield and finish. All calls are
  /// to the continuation are serialized. However, calling yield from multiple
  /// concurrent contexts could result in out of order delivery.
  public init(
    _ elementType: Element.Type = Element.self,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  )
}

extension AsyncStream: AsyncSequence {
  /// The asynchronous iterator for iterating an AsyncStream.
  ///
  /// This type is not Sendable. It is not intended to be used
  /// from multiple concurrent contexts. Any such case that next is invoked
  /// concurrently and contends with another call to next is a programmer error
  /// and will fatalError.
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async → Element?
  }

  /// Construct an iterator.
  public func makeAsyncIterator() → Iterator
}

extension AsyncStream.Continuation {
  /// Resume the task awaiting the next iteration point by having it return
  /// nomally from its suspension point or buffer the value if no awaiting
  /// next iteration is active.
  ///
  /// - Parameter result: A result to yield from the continuation.
  ///
  /// This can be called more than once and returns to the caller immediately
  /// without blocking for any awaiting consuption from the iteration.
  public func yield(
    with result: Result<Element, Never>
  )

  /// Resume the task awaiting the next iteration point by having it return
  /// nomally from its suspension point or buffer the value if no awaiting
  /// next iteration is active where the `Element` is `Void`.
  ///
  /// This can be called more than once and returns to the caller immediately
  /// without blocking for any awaiting consuption from the iteration.
  public func yield() where Element == Void
}

public struct AsyncThrowingStream<Element> {
  public struct Continuation: Sendable {
    public enum Termination {
      case finished(Error?)
      case cancelled
    }
    
    /// * See AsyncStream.Continuation.yield(_:) *
    public func yield(_ value: Element)

    /// Resume the task awaiting the next iteration point with a terminal state.
    /// If error is nil, this is a completion with out error. If error is not 
    /// nil, then the error is thrown. Both of these states indicate the 
    /// end of iteration.
    ///
    /// - Parameter error: The error to throw or nil to signify termination.
    ///
    /// Calling this function more than once is idempotent. All values received
    /// from the iterator after it is finished and after the buffer is exhausted 
    /// are nil.
    public func finish(throwing error: Error? = nil)

    /// * See AsyncStream.Continuation.onTermination *
    public var onTermination: (@Sendable (Termination) -> Void)? { get nonmutating set }
  }

  /// * See AsyncStream.init *
  public init(
    _ elementType: Element.Type,
    maxBufferedElements limit: Int = .max,
    _ build: (Continuation) -> Void
  )
}

extension AsyncThrowingStream: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async throws -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

extension AsyncThrowingStream.Continuation {
  /// Resume the task awaiting the next iteration point by having it return
  /// nomally from its suspension point or buffer the value if no awaiting
  /// next iteration is active.
  ///
  /// - Parameter result: A result to yield from the continuation.
  ///
  /// This can be called more than once and returns to the caller immediately
  /// without blocking for any awaiting consuption from the iteration.
  public func yield<Failure: Error>(
    with result: Result<Element, Failure>
  )

  /// * See AsyncStream.yield() *
  public func yield() where Element == Void
}

Buffering Values

By default, every element yielded to an AsyncStream’s continuation is buffered until consumed by iteration. This matches the expectation for most of the APIs we anticipate being adapted via AsyncStream — with a stream of notifications, database records, or other similar types, the caller needs to receive every single one.

If the caller specifies a different value n for maxBufferedElements, then the most recent n elements are buffered until consumed by iteration. If the caller specifies 0, the stream switches to a dropping behavior, dropping the value if nothing is awaiting the iterator’s next.

Finishing the Stream

Calling a continuation’s finish() method moves its stream into a “terminated” state. An implementor of an AsyncThrowingStream can optionally pass an error to be thrown by the iterator. After providing all buffered elements, the stream’s iterator will return nil or throw an error, as appropriate.

The first call to finish() sets the terminating behavior of the stream (either returning nil or throwing); further calls to finish() or yield(_:) once a stream is in a terminated state have no effect.

Cancellation Handlers

When defined, a continuation’s onTermination handler function is called when iteration ends, when the stream goes out of scope, or when the task containing the stream is canceled. You can safely use the onTermination handler to clean up resources that were opened or allocated at the start of the stream.

This onTermination behavior is shown in the following example — once the task containing the stream is canceled, the onTermination handler for the stream’s continuation is called:

let t = detach {
  func make123Stream() -> AsyncStream<Int> {
    AsyncStream { continuation in
      continuation.onTermination = { termination in
        switch termination {
        case .finished:
            print("Regular finish")
        case .cancelled:
            print("Cancellation")
        }
      }
      detach {
        for n in 1...3 {
          continuation.yield(n)
          sleep(2)
        }
        continuation.finish()
      }
    }
  }

  for await n in make123Stream() {
    print("for-in: \(n)")
  }
  print("After")
}
sleep(3)
t.cancel()

// for-in: 1
// for-in: 2
// Cancellation
// After

Convenience Methods

As conveniences, both AsyncStream continuations include a yield(with:) method that takes a Result as a parameter and, when the stream’s Element type is Void, a yield() method that obviates the need for passing an empty tuple.

Alternatives considered

YieldingContinuation

Users use os_unfair_lock/unlock or other locking primitives

The buffering behavior in AsyncStream requires the use of locks in order to be thread safe. Without a native locking mechanism, Swift developers would be left to write their own custom (and potentially per-platform) locks. We think it is better for a standard library solution to be provided that removes this complexity and potential source of errors.

Source compatibility

This proposal is purely additive and has no direct impact on existing source.

Effect on ABI stability

This proposal is purely additive and has no direct impact on ABI stability.

Effect on API resilience

The current implementation leaves room for future development of this type to offer different construction mechanisms and is encapsulated into it's own type hierarchy so it has no immediate impact upon API resilience and is future-proofed for future development.

Acknowledgments

We would like to thank Jon Shier and David Nadoba for their helpful insight and feedback driving the discussion on YieldingContinuation to make us look further into making a more well rounded solution. Special thanks go to the creators of the reactive-streams specification , without which numerous behavioral edge cases would not have been considered.

22 Likes

Thank you for writing this up! This is exactly what we need to add support for async/await to RSocket. Big +1!

Do you know how other languages solve this?

Kotlin solves this with a thing called Flow for one example. This proposal is relatively similar.

1 Like

When is the closure called? As soon as the Async[Throwing]Stream is created, when I ask for the iterator or when I first call next() on it?

The implementation calls it immediately. Hence the non escaping characteristic. I tried putting it on the first call to next but that ends up having unexpected behavior of dropping values that would have been yielded before the iteration.

1 Like

Big +1. Would cover a lot of great pub/sub use cases and beyond.

1 Like

Just to double check, does this apply to the cancellation case as well?

Yes please! I realize that locks aren't the idealized model Swift is shooting for but they seem unavoidable. An implementation offered by the language that is optimized to work with the language concurrency model would be hugely valuable.

5 Likes

Yes it does. And an extra added bonus: it informs that block of what happened to bring it to a terminal state - cancellation, finishing, or when it can finishing with an error.

That is out of scope of this proposal; Foundation does offer a locking primitive (and to do any better there would need to be some pretty large language changes to really do any better/safer etc… another pitch for another time… but if folks are interested in that I’d be happy to help brainstorm a solution and aide in someone pitching it)

Big +1 This looks really good. I do have one concern though. Although buffering may be what most users want, there are cases where you would only want to buffer and a certain amount of values before stopping to produce values. How would backpressure be handled here? In previous iterations yield returned a bool to indicate that the buffer is full.

2 Likes

With out speaking about Foundation Stream API’s future plans or current comparability concerns ; Are there any cases where the Stream data structure would offer features that AsyncStream is not able to provide? Or is AsyncStream addressing something completely different from the main use case of Stream? My naive notion is that AsyncStream is a super set of Stream (at least functionality wise).

Thanks,
Chéyo

Foundation.Stream has nothing to do with anything here.

Cool. How so? Why share the name? I would love this explanation being added to the proposal to explain the differences.

Why? Stream is such a generic term collisions are essentially unavoidable. AsyncStream should be easily understood as a separate type and concept.

2 Likes

This paragraph probably needs an additional sentence to be clear. What is meant is that, since we do not provide a lock type as part of the stdlib API, we should provide this because it does not require someone to implement that locking functionality themselves.

Appreciate that buffering semantics are laid out in the pitch. :bowing_man:

The pitch now describes two supported behaviours:

  • Buffer + Drop Oldest: Buffer up to N elements, and drop oldest on buffer overflow
  • Drop On Yield: Drop the element when there is no awaiting call on next().

These would indeed cover well many existing stream-of-values/PubSub use cases to be bridged to the new Swift Async world. Most of these are "hot" streams, in the sense that they are unilaterally broadcasted by their publisher, regardless of the presence, preference or processing rate of the subscribers. So buffering and/or throttling are sensible options in this context.

However, the pitch does not quite offer an answer to backpressure use cases, that would be opened up (or made easier to implement) with the introduction of Swift async-await. A couple of notable examples:

  • A publisher may want to adapt to a slow subscriber, including any further asynchronous work spawned by the subscriber (under the Structured Concurrency model).

    e.g. a database query listener stream, which may observe dirty notifications at a very high frequency, but it does not rerun the database query & yield the result set, until the consumer has finished processing the previous yielded value.

    In Kotlin Coroutines, this behaviour maps to the SUSPEND buffer overflow strategy, which suspends the producer's emitting/yielding call, until the buffer has space, or (if no buffer space is configured) until the consumer has done with the previous emission.

  • In some scenarios, we may need the guarantee of a subscriber receiving every single element yielded by the publisher.

    e.g. chunked file content processing using an AsyncStream

    In Kotlin Coroutines, this behaviour maps to a non-buffering Rendezvous channel, and is the default behaviour of "cold" Flow<T>.

Part of the premise of Swift async-await model, as I understand it, is to bake asynchrony on top of the familiar linear control flow model. In the synchronous control flow world, elements are guaranteed to be walked through one-by-one, and dropping happens only when explicitly requested (via lazy operators). So in my opinion, the asynchronous counterpart should maintain this same mental model and default semantics.

I also wonder how well AsyncStream can bridge to Combine without backpressure support in one form or another. As far as I understand, an out-of-the-box Combine Publisher is non-buffering, non-dropping.

5 Likes

I understood the meaning. I was just offering support for that point. We really do need some sort of native locking mechanism.

Naming this construct, as frequently happens, was one of the more difficult parts of designing this API. AsyncStream was the least bad name we were able to come up with. We'd be thrilled if the discussion on this thread hit upon a better name!

Our goals with the name AsyncStream were:

  • To have a concise name for the type, since we anticipate the type will frequently show up both at use sites and in API signatures.
  • For the name to clearly convey that it's a... stream... you know... an async sequence of elements.

The main alternative name we considered was Series. I personally advocated for AsyncStream because:

  • Series (to me) doesn't convey anything async, it's more of a synonym for sequence (e.g. see here).
  • Warning very subjective! AsyncSeries just seems like a weird synonym for AsyncSequence.

Full disclosure, others were more concerned about the collision with Foundation.Stream than I was. As @Jon_Shier says, other than modeling the same abstract concept, AsyncStream has no relationship to Foundation.Stream. If others feel like sharing the the "stream" suffix is confusing, please let us know—that's valuable signal!

In the apple ecosystem Foundation.Stream is the default stream construct. Similarly in dotnet they have streams that closely match Foundation.Stream. In C# 8 Async Streams were introduced in a way that they just build on top of regular streams. I understand this pitch did not decide to go this route but I dont think we can introduce an AsycSteam construct without saying specifically why and how this differs from Foundation.Stream. The name is fine in my book as long as this distinction is spelled out in the proposal. I know the audience in this thread is more than familiar with reactive streams but this is something I have not had to deal with yet and I don't think many devs are familiar.

Am I completely offbase here by thinking that I should be able to adapt an API that currently uses Foundation.Stream to use AsynStream?

This isn't really true, at least not in any meaningful way. Stream (previously NSStream) is an ancient, abstract bit of Foundation most developers will never see, much less use. Usage is almost entirely limited to the concrete InputStream and OutputStream subclasses which only deal with buffers in a few, rare bits of Foundation. Some of their usage has been replaced by DispatchSources and most other usage is largely avoided due to their impedance mismatch with Swift itself. So aside from the name itself, I doubt most Swift developers will even make the connection.

As for naming, I view Stream as poorly named anyway, as it's far too general a description for what it does. It should be DataStream or ByteStream, but I wouldn't want to take any of the names that could be applied to better types. In general there probably shouldn't be a Stream type at all unless there's some sort of universal representation that's useful for the language (which it may be for C#).

If we want to think about better (or at least different) names it might be useful to define which capabilities of AyncStream make it a stream rather than a sequence or series. @kylemacomber is likely right that "series" feels too similar to "sequence" but if that's the case, we need to determine what makes "stream" feel less similar. That it keeps internal state for buffering? That it handles termination?

2 Likes

Has a protocol-based design been explored? Experience with the lazy collection types has informed me that concrete types which customise their logic via closures are several times slower than copy-pasting the type and writing a manual specialisation.