[Concurrency] AsyncSequence

Hi everyone,

@Philippe_Hausler and I would like to pitch the following proposal. We build on the Swift Concurrency pitches to add the concept of an AsyncSequence to the Swift standard library.

An up-to-date version of this proposal can be found here.

Async/Await: Sequences

Swift's proposed async/await feature provides an intuitive, built-in way to write and use functions that return a single value at some future point in time. We propose building on top of this feature to create an intuitive, built-in way to write and use functions that return many values over time.

This proposal is composed of the following pieces:

  1. A standard library definition of a protocol that represents an asynchronous sequence of values
  2. Compiler support to use for...in syntax on an asynchronous sequence of values
  3. A standard library implementation of commonly needed functions that operate on an asynchronous sequence of values

Motivation

We'd like iterating over asynchronous sequences of values to be as easy as iterating over synchronous sequences of values. An example use case is iterating over the lines in a file, like this:

for await try line in myFile.lines() {
  // Do something with each line
}

for/in Syntax

To enable the use of for in, we must define the return type from func lines() to be something that the compiler understands can be iterated. Today, we have the Sequence protocol. Let's try to use it here:

extension URL {
  struct Lines: Sequence { /* ... */ }
  func lines() async -> Lines
}

Unfortunately, what this function actually does is wait until all lines are available before returning. What we really wanted in this case was to await each line. While it is possible to imagine modifications to lines to behave differently (e.g., giving the result reference semantics), it would be better to define a new protocol to make this iteration behavior as simple as possible.

extension URL {
  struct Lines: AsyncSequence { /* ... */ }
  func lines() async -> Lines
}

AsyncSequence allows for waiting on each element instead of the entire result by defining an asynchronous next() function on its associated iterator type.

Additional AsyncSequence functions

Going one step further, let's imagine how it might look to use our new lines function in more places. Perhaps we only want the first line of a file because it contains a header that we are interested in:

let header: String?
do {
  for await try line in myFile.lines() {
    header = line
    break
  }
} catch {
  header = nil // file didn't exist
}

Or, perhaps we actually do want to read all lines in the file before starting our processing:

var allLines: [String] = []
do {
  for await try line in myFile.lines() {
    allLines.append(line)
  }
} catch {
  allLines = []
}

There's nothing wrong with the above code, and it must be possible for a developer to write it. However, it does seem like a lot of boilerplate for what might be a common operation. One way to solve this would be to add more functions to URL:

extension URL {
  struct Lines : AsyncSequence { }

  func lines() -> Lines
  func firstLine() throws async -> String?
  func collectLines() throws async -> [String]
}

It doesn't take much imagination to think of other places where we may want to do similar operations, though. Therefore, we believe the best place to put these functions is instead as an extension on AsyncSequence itself, specified generically -- just like Sequence.

Proposed solution

The standard library will define the following protocols:

public protocol AsyncSequence {
  associatedtype AsyncIterator: AsyncIteratorProtocol where AsyncIterator.Element == Element
  associatedtype Element
  func makeAsyncIterator() -> Iterator
}

public protocol AsyncIteratorProtocol {
  associatedtype Element
  mutating func next() async throws -> Element?
  mutating func cancel()
}

The compiler will generate code to allow use of a for in loop on any type which conforms with AsyncSequence. The standard library will also extend the protocol to provide familiar generic algorithms. Here is an example which does not actually call an async function within its next, but shows the basic shape:

struct Counter : AsyncSequence {
  let howHigh: Int

  struct AsyncIterator : AsyncIteratorProtocol {
    let howHigh: Int
    var current = 1
    mutating func next() async -> Int? {
      guard current <= howHigh else {
        return nil
      }

      let result = current
      current += 1
      return result
    }

    mutating func cancel() {
      current = howHigh + 1 // Make sure we do not emit another value
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    return Iterator(howHigh: howHigh)
  }
}

At the call site, using Counter would look like this:

for await i in Counter(howHigh: 3) {
  print(i)
}

/* 
Prints the following, and finishes the loop:
1
2
3
*/


for await i in Counter(howHigh: 3) {
  print(i)
  if i == 2 { break }
}
/*
Prints the following, then calls cancel before breaking out of the loop:
1
2
*/

Any other exit (e.g., return or throw) from the for loop will also call cancel first.

Detailed design

Returning to our earlier example:

for await try line in myFile.lines() {
  // Do something with each line
}

The compiler will emit the equivalent of the following code:

var it = myFile.lines().makeAsyncIterator()
while let value = await try it.next() {
  // Do something with each line
}

All of the usual rules about error handling apply. For example, this iteration must be surrounded by do/catch, or be inside a throws function to handle the error. All of the usual rules about await also apply. For example, this iteration must be inside a context in which calling await is allowed like an async function.

Cancellation

If next() returns nil then the iteration ends naturally and the compiler does not insert a call to cancel(). If next() throws an error, then iteration also ends and the compiler does not insert a call to cancel(). In both of these cases, it was the AsyncSequence itself which decided to end iteration and there is no need to tell it to cancel.

If, inside the body of the loop, the code calls break, return or throw, then the compiler first inserts a synchronous call to cancel() on the it iterator.

If this iteration is itself in a context in which cancellation can occur, then it is up to the developer to check for cancellation themselves and exit.

for await try line in myFile.lines() {
  // Do something
  ...
  // Check for cancellation
  await try Task.checkCancellation()
}

This way we leave control of cancellation (which is a potential suspension point, and may be something to do either before or after receiving a value) up to the developer.

Rethrows

This proposal will take advantage of a separate proposal to add specialized rethrows conformance in a protocol, pitched here. With the changes proposed there for rethrows, it will not be required to use try when iterating an AsyncSequence which does not itself throw.

The await is always required because the definition of the protocol is that it is always asynchronous.

AsyncSequence Functions

The existence of a standard AsyncSequence protocol allows us to write generic algorithms for any type that conforms to it. There are two categories of functions: those that return a single value (and are thus marked as async), and those that return a new AsyncSequence (and are not marked as async themselves).

The functions that return a single value are especially interesting because they increase usability by changing a loop into a single await line. Functions in this category are first, contains, count, min, max, reduce, and more. Functions that return a new AsyncSequence include filter, map, and compactMap.

AsyncSequence to single value

Algorithms that reduce a for loop into a single call can improve readability of code. They remove the boilerplate required to set up and iterate a loop.

For example, here is the first function:

extension AsyncSequence {
  public func first() async rethrows -> AsyncIterator.Element?
}

With this extension, our "first line" example from earlier becomes simply:

let first = await try? myFile.lines().first()

The following functions will be added to AsyncSequence:

Function Note
contains(_ value: Element) async rethrows -> Bool Requires Equatable element
contains(where: (Element) async throws -> Bool) async rethrows -> Bool The async on the closure allows optional async behavior, but does not require it
allSatisfy(_ predicate: (Element) async throws -> Bool) async rethrows -> Bool
first(where: (Element) async throws -> Bool) async rethrows -> Element?
first() async rethrows -> Element? Not a property since properties cannot throw
min() async rethrows -> Element? Requires Comparable element
min(by: (Element, Element) async throws -> Bool) async rethrows -> Element?
max() async rethrows -> Element? Requires Comparable element
max(by: (Element, Element) async throws -> Bool) async rethrows -> Element?
reduce<T>(_ initialResult: T, _ nextPartialResult: (T, Element) async throws -> T) async rethrows -> T
reduce<T>(into initialResult: T, _ updateAccumulatingResult: (inout T, Element) async throws -> ()) async rethrows -> T
count() async rethrows -> Int Not a property since properties cannot throw

AsyncSequence to AsyncSequence

These functions on AsyncSequence return a result which is itself an AsyncSequence. Due to the asynchronous nature of AsyncSequence, the behavior is similar in many ways to the existing Lazy types in the standard library. Calling these functions does not eagerly await the next value in the sequence, leaving it up to the caller to decide when to start that work by simply starting iteration when they are ready.

As an example, let's look at map:

extension AsyncSequence {
  public func map<Transformed>(
    _ transform: @escaping (AsyncIterator.Element) async throws -> Transformed
  ) -> AsyncMapSequence<Self, Transformed>
}

public struct AsyncMapSequence<Upstream: AsyncSequence, Transformed>: AsyncSequence {
  public let upstream: Upstream
  public let transform: (Upstream.AsyncIterator.Element) async throws -> Transformed
  public struct Iterator : AsyncIterator { 
    public mutating func next() async rethrows -> Transformed?
    public mutating func cancel()
  }
}

For each of these functions, we first define a type which conforms with the AsyncSequence protocol. The name is modeled after existing standard library Sequence types like LazyDropWhileCollection and LazyMapSequence. Then, we add a function in an extension on AsyncSequence which creates the new type (using self as the upstream) and returns it.

Function
map<T>(_ transform: (Element) async throws -> T) -> AsyncMapSequence
compactMap<T>(_ transform: (Element) async throws -> T?) -> AsyncCompactMapSequence
drop(while: (Element) async throws -> Bool) async rethrows -> AsyncDropWhileSequence
dropFirst(_ n: Int) async rethrows -> AsyncDropFirstSequence
prefix(while: (Element) async throws -> Bool) async rethrows -> AsyncPrefixWhileSequence
prefix(_ n: Int) async rethrows -> AsyncPrefixSequence
filter(_ predicate: (Element) async throws -> Bool) async rethrows -> AsyncFilterSequence
append<S>(_ other: S) -> AsyncAppendSequence where Element == S.Element, S : AsyncSequence
prepend<S>(_ other: S) -> AsyncPrependSequence where Element == S.Element, S : AsyncSequence

Future Proposals

The following topics are things we consider important and worth discussion in future proposals:

Additional AsyncSequence functions

We've aimed for parity with the most relevant Sequence functions. There may be others that are worth adding in a future proposal.

This includes API which uses a time argument, which also must be coordinated with the discussion about Executor as part of the structured concurrency proposal.

AsyncSequence Builder

In the standard library we have not only the Sequence and Collection protocols, but concrete types which adopt them (for example, Array). We will need a similar API for AsyncSequence that makes it easy to construct a concrete instance when needed, without declaring a new type and adding protocol conformance.

Source compatibility

This new functionality will be source compatible with existing Swift.

Effect on ABI stability

This change is additive to the ABI.

Effect on API resilience

This change is additive to API.

Alternatives considered

Asynchronous cancellation

The cancel() function on the iterator could be marked as async. However, this means that the implicit cancellation done when leaving a for/in loop would require an implicit await -- something we think is probably too much to hide from the developer. Most cancellation behavior is going to be as simple as setting a flag to check later, so we leave it as a synchronous function and encourage adopters to make cancellation fast and non-blocking.

Opaque Types

Each AsyncSequence-to-AsyncSequence algorithm will define its own concrete type. We could attempt to hide these details behind a general purpose type eraser. We believe leaving the types exposed gives us (and the compiler) more optimization opportunities. A great future enhancement would be for the language to support some AsyncSequence where Element=...-style syntax, allowing hiding of concrete AsyncSequence types at API boundaries.

Reusing Sequence

If the language supported a reasync concept, then it seems plausible that the AsyncSequence and Sequence APIs could be merged. However, we believe it is still valuable to consider these as two different types. The added complexity of a time dimension in asynchronous code means that some functions need more configuration options or more complex implementations. Some algorithms that are useful on asynchronous sequences are not meaningful on synchronous ones. We prefer not to complicate the API surface of the synchronous collection types in these cases.

Naming

The names of the concrete AsyncSequence types is designed to mirror existing standard library API like LazyMapSequence. Another option is to introduce a new pattern with an empty enum or other namespacing mechanism.


A note about Combine

This is not part of the pitch, but still worth a callout. There is an overlap in the ideas presented here and those that are part of the Combine framework, as well as many other related projects in the open source community. While Combine is not open source and therefore not part of the Swift Evolution process, we would like to note that the primary authors of this proposal are also involved with the design and evolution of Combine. We are not yet ready to discuss details on our plans for how this change to Swift interacts with Combine, but we do want to reassure readers that we are committed to providing a clear and comprehensive developer experience for those developing on Apple platforms.

41 Likes

Cool, this seems like a very logical direction. A few thoughts on this:

Most significantly, the integration of for/in and cancel() seems like it is breaking separation of concerns and is unprecedented by other things that leave scopes early. Have you consider implementing cancelation with a deinit() on the AsyncIterator type? This would make this compose correctly in other places that don't use the for/in syntax.


In terms of syntax, the pile of keywords is a mouthful:

for await try line in myFile.lines() {
  // Do something with each line
}

I like how this composes with the "rethrows" pitch to make it so these keywords will only be required when the concrete sequence requires them.

That said, when present, why do the await and try keywords get applied to the pattern? It doesn't execute any code. It also shouldn't be on the initializer expression (where the keywords already mean something else for throwing and async initializers), so I think they should be modifiers on the for keyword, ala:

await try for line in myFile.lines() {
  // Do something with each line
}

This is consistent with the fact that it is the for itself that is forming the iterator and doing the potentially async/throwing stuff.


One minor typo:

public protocol AsyncSequence {
  associatedtype AsyncIterator: AsyncIteratorProtocol where AsyncIterator.Element == Element
  associatedtype Element
  func makeAsyncIterator() -> Iterator
}

^the result type should be AsyncIterator.

This is an exciting direction Tony, I'm glad you're pushing on it.

-Chris

10 Likes

This is looking great :+1:

A quick heads up for readers of all the various proposals:

This feature would be used by TaskGroup (from the [Concurrency] Structured concurrency), to make the iteration over the task group's results in the same style as proposed here; Cancellation handling would be consistent with whatever this proposal ends up with as well.

4 Likes

One thing that I find interesting is the presence of both AsyncSequence and AsyncIterator. It’s been discussed that one of the flaws of Sequence is that it’s not really clear (and definitely not enforced) whether conforming types must support multiple passes. I suspect most AsyncSequences will not support multiple passes, but there’s no way to enforce that.* So one alternate design would be to have just AsyncIterator.** A for await loop would still make a private copy of the iterator, but otherwise everything would still work without having to go through the indirection of AsyncSequence and makeAsyncIterator. (That’s always been a silly overhead in defining a custom sequence as well.)

This doesn’t fix everything, though, since you can still copy around the iterator value, and that probably doesn’t make the iterator multipass either. And I can imagine it being less obvious that manually calling next() (or cancel()) in the body of the for await loop won’t affect the loop iteration! So this may not actually be easier for newcomers to understand.

So overall I’m not sure which way this goes. But I think it’d be good to discuss as a Considered Alternative at the very least. :-)

P.S. One other thing that was considered early on was that Iterator should refine Sequence, with makeIterator returning self. At the time the compiler literally couldn’t handle this, and by the time it was reconsidered Sequence was considered to be problematic in other ways (which don’t necessarily apply to AsyncSequence). Was that considered for AsyncIterator? Is it a good idea or not?

* In a Swift with move-only types, we could mark makeAsyncIterator as consuming.

** Which may no longer make sense as a name. AsyncGenerator? Stream?

7 Likes

I sympathize with Chris’s complaint that this is a mouthful, but I think for await try is better than await try for for a handful of reasons:

  • Scanning through a function body, it’s easier to spot a loop with for at the front of the line. I’ll admit this isn’t a super strong justification because functions with trailing closures may also act loop-like, and other awaits won’t have bodies following them.

  • Putting the await “inside” the for is a reminder that there are multiple suspension points here, not just one before or after the loop.

  • While I don’t think we want arbitrary await or try patterns, it actually has a reasonable interpretation here: in order to destructure the result of next(), the caller must first await it and then try it (i.e. propagate errors).

I’m sure we’ll get used to whatever syntax is picked pretty quickly, though.

9 Likes

This looks great!

It may be worth adjusting this expansion to make sure async sequences are free of this long-standing issue with optional promotions vs the for-in statement:

let array = [1, 2, 3]
for i: Int? in array {
  print(i) // ⟹ 1, 2, 3, nil, nil, nil, nil...
}

(https://bugs.swift.org/browse/SR-8688)

3 Likes

This. Users are supposed to assume that Sequence is single-pass, but it actively tries to confuse them with methods like makeIterator() that can't mutate the Sequence, on top of the fact that IteratorProtocol also exists and is an excellent model of a single-pass Sequence. I wrote about this a while ago, but it bears repeating here:

You get one shot - one chance to inspect the elements in the sequence, and after that, anything you do is semantically undefined. We don't provide any guidance on what a Sequence should do if somebody tries to iterate it again after consuming it (return an empty iterator? trap? produce random values?).

func myAlgorithm<S>(_ input: S) where S: Sequence, S.Element == Int {
  if input.contains(42) {
    // We have consumed 'input'. Who knows what accessing it again will do?
  }
  // We have consumed 'input'. Who knows what accessing it again will do?
}

None of the Sequence API makes this behaviour clear. Actually, it actively tries to confuse you by having the same operations with the same names as you'd find on a Collection, but with totally different composition semantics. I would bet that at least 3/4 of generic algorithms which accept a Sequence are incorrect - and again, we don't provide any guidance about what might happen in those cases. They likely won't be "unsafe" in the sense of memory safety, but they will produce unpredictable results and possibly lose or corrupt user data.

All of these flaws are inherited by this proposed AsyncSequence design. In fact it actually adds one:

Sequence explicitly does not include a count property, because like all Sequence APIs which inspect the contents, it would consume it. It would tell you how many elements it used to contain, but that's kind of irrelevant because you'll never see that sequence of elements again.

That's not to say that Sequence is entirely useless, though! It's there because if you have an algorithm which really is single-pass, it would be nice for Collections to have access to it without the ugliness of creating an iterator. The downside is that the same method handles both consuming and non-consuming cases, with the same spelling.

Since we don't have an async Collection, it's hard to justify an async Sequence.

EDIT: Updated reason why we have Sequence (I had to look it up!)

9 Likes

One more thing (not related to this pitch): please do not add this API. Something like this might be nice on FileManager, but definitely not on URL.

Sorry but I just had to say something. That's me done, though :zipper_mouth_face:

2 Likes

If we can get some clarity around this that would be really good. An infinite AsyncIterator protocol would be a perfect building block for allowing users to transition from the SwiftNIO ChannelPipeline into an async world with minimal conceptual overhead, and would make bridging between SwiftNIO and higher-level applications much easier.

Otherwise I am very very +1 on this extension. It seems well thought out and I have confidence that the authors will tackle this ambiguity. Great work @Tony_Parker and @Philippe_Hausler!

7 Likes

On the keyword soup issue (for await try x in …), I wonder if the problem might be solved by eliminating the for. Starting without a try:

  await line in myFile.lines() {
    allLines.append(line)
  }

Or with a try:

  await try line in myFile.lines() {
    allLines.append(line)
  }

The idea is that this could be regarded as an await … in construct instead of a for … in construct, which might be a handy way to distinguish between the 2 use cases.

8 Likes

If we started off with just iterators, we wouldn't even need any special language constructs. It would just be a normal async function in a loop.

var lineIter = FileManager.default.file(at: url).makeLineIterator()
while let line = await lineIter.next() { // suspends while waiting for next line
  // process line.
}
1 Like

There is also a precedent here as JavaScript folks have chosen for await instead of await for in the end:

await for makes it appear that we are awaiting the result of a regular for-of loop.

Interestingly enough, Python ended up with async for, but that doesn't make much sense in our case, as await try already seems to be preferred (and more logical) in other contexts.

3 Likes

Thank you all for your helpful feedback! I'll try to consolidate some responses here.

AsyncIterator protocol only

An important part of this design is that developers can use the Swift they already know to interact with these async sequences. To me, that means sticking with the "for/in" syntax for iteration. I think if we incur a large departure in design from Sequence for AsyncSequence by dropping the sequence protocol we will end up in a place where for loops behave differently depending on what you're iterating over, which would just add something else to trip on. In other words, consistency in behavior is valuable even if we can find legitimate reasons that the Sequence API should have been designed differently.

Also, there are examples of both single-pass and multi-pass async sequences, so declaring the protocol as only usable by one of them limits its utility too much. With this lines example here, I think a reasonable behavior would be that if you iterate it twice you start at the beginning of the file each time. However, if we imagine an AsyncSequence of NotificationCenter posts (e.g.), then that would clearly be single-pass. So I think, just like Sequence, it will be up to concrete AsyncSequences to define their behavior and document it.

This proposal leaves room for a future direction of building protocols on top of AsyncSequence that more clearly define those which have room for efficient behavior around buffering. For async sequences which support peek, we could define PeekableAsyncSequence (straw man) and algorithms which can take advantage of the ability to look into future elements without consuming it could be written against that protocol instead of Sequence. So one way that we can help right the wrongs of Sequence is to make sure we only add generic API to AsyncSequence that does not require multi-pass behavior (for example, by re-starting iteration of the sequence).

Cancel vs deinit

If we required cancellation to be part of deinit then we also imply (perhaps require?) that AsyncIterators be class types. There are good examples of AsyncSequence iterators which do not need to be classes, like the one for AsyncMapSequence.

I think a more fundamental reason to build cancellation into the protocol vs relying on object lifetime is just that we (compiler and/or manually written code) can carefully control when cancellation is triggered. It's not reference counted, so it becomes far more deterministic from the point of view of the developer. Cancellation can also happen before the compiler has decided it's time to call release (which is hopefully the last retain) and therefore trigger deallocation.

This question does bring up an important point, though -- we should document here, and in tests, best practices for adopting this protocol. For example, having the ability to implement cancel directly means that adapter sequences like the one for map should call cancel on their upstream when they are themselves cancelled. It's probably a good idea to not emit any more values after cancel as well (upon a call to next).

Keyword soup

No surprise, probably, since my name is on the proposal, but I prefer the await and try inside the for. The reason is that I see the code like this:

for <stuff that runs each time> in <stuff that is called once>

and the key intuition one needs to make sense of this protocol is that it awaits and trys each time you loop. await for says, to me, await the result of the entire for loop.

11 Likes

You can even imagine, in a hypothetical Swift where for was an expression, that this would be valid:

let x = await try for await try line in await try makeURL().lines() {
  
}

which would mean "await the result of this for loop, which itself awaits each item in the AsyncSequence produced by awaiting makeURL().lines()"

Not that we should actually do that, but it illustrates how all three possible places to put the keywords are semantically meaningful, but they don't mean the same thing.

I sympathize with concerns elsewhere in the thread about keyword soup, but it's useful to remember that all the pieces of it are independently meaningful, and the reason there's a lot of them in a small space is we're packing an enormous amount of functionality into a small space.

15 Likes

Don't forget flatMap.

Thanks, that should be in the list.

Before I read any of the other replies, I felt I had to respond about the “Sets and Dictionary aren’t really Sequences” problem. Some of the other replies hinted at the problem. It’s caused by Sequence being over-specified. The for-in construct does not require the source sequence to have the order it vends elements be part of that type’s semantic; but (almost) all of Sequence’s methods do make that assumption. I’ll probably make a list of related threads when I type on my Mac (instead of this iPad).

The solution is to make a new protocol like Sequence whose semantics rip out the vending-order-is-part-of-the-semantic part. Technically, based on computer science theory, it could be a base protocol for Sequence, but ABI stability forbids inserting new base protocols, even if care is taken to preserve source stability.

You’re suggesting something similar to Sequence, so the same concerns apply. But it’s new, so we can add the base protocol(s) now. I know it’s more of a concern for computer science nerds and probably 80% of users won’t need it, but I’m suggesting base protocols, and as such can’t be added later. This happened with AdditiveArithmetic and Numeric right before ABI stability. I’ll come up with an outline after I wake up.

Oh, is there a reason the new sequence protocol calls its associated iterator type “AsyncIterator”? Unless you think types can simultaneously AsyncSequence and Sequence, you could reuse the “Iterator“ name.

2 Likes

With respect to AsyncIterator, we think there is no good reason to reuse the associated type name and basically prohibit something adopting both (or require some new language feature to disambiguate).

It’s off topic so we shouldn’t debate it here – we can move this sub-thread to a new discussion topic if more discussion is desired – but this is not true. Nowhere in the docs for Sequence does it specify ordering. Given sequences aren’t even guaranteed multi pass, you can’t infer from that that the order has any meaning other than that if you must step through something an element at a time, that stepping must of course happen in some order.

8 Likes

And there’s a precedent from JS where for await (const element of generator()) { ... } is the syntax for iterating over an async generator.

1 Like
Terms of Service

Privacy Policy

Cookie Policy