Follow-up on AsyncSequence and parity with Sequence based APIs

Introduction

When writing asynchronous code using AsyncSequence adopters it is often needed to be able to convert back and forth from the async world to the synchronous world. In many other similar APIs there are mechanisms to accommodate for this. Namely of which those conversions work best if they are trailing syntax upon those types to aide in ergonomics of call sites and code-completion.

In the same vein of useful additions; there were a few APIs missing from the initial AsyncSequence proposal that are present on Sequence. A number of these were omitted due to missing features of the language that had not yet been proposed.

Motivation

Learning and testing in Swift is incredibly important as both an implementation requirement but also for developers using the language. The newly introduced AsyncSequence protocol offers great flexibility of expressing values over time but there are a few places where it could use a few connections to the non-async world. One of those connections is the process of collecting all values and awaiting the completion of the asynchronous sequence. The other major point is the creation of stand-in or pre-computed asynchronous sequences.

For nearly all use cases AsyncSequence should have similar interfaces as Sequence. This provides a stable interface for developers to work with and have expectations of prototypes written as non-async code to be just as simple as adopting the asynchronous protocol instead (of course with the caveat of where it makes sense to have symmetry). This is a two way street. Where there are mechanisms that make sense we should also consider those if they make sense to Sequence.

Proposed Solution

To facilitate the collecting of values and moving from the asynchronous world into the synchronous world we should add an extension on AsyncSequence of collect to gather up all the values asynchronously and produce an array of those values. This of course means that the function must follow the effects entailed with said asynchronous iteration; that means that when an AsyncSequence that throws is collected it will throw and when it does not throw of course that collecting process should not throw. This means that the act should be rethrows according to the conformance of the type it is called upon. Furthermore collecting all values must also be in it of itself asynchronous.

extension AsyncSequence {
  public func collect() async rethrows -> [Element]
}

Similar to the construction of LazySequence adopters, Sequence itself can easily produce values that can be represented asynchronously. This can be achieved by a similar extension on Sequence as lazy (in this case async) and a concrete generic type that adapts a given Sequence into a suitable asynchronous sequence.

public struct AsyncLazySequence<Base: Sequence>: AsyncSequence {
  public typealias Element = Base.Element
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async -> Base.Element?
  }
  
  public func makeAsyncIterator() -> Iterator
}

extension Sequence {
  public var async: AsyncLazySequence<Self> { get }
}

In the category of existing APIs missing parity are enumerated(), joined(), elementsEqual(), and zip(). We should add these methods to AsyncSequence.

extension AsyncSequence {
  public func enumerated() -> AsyncEnumeratedSequence<Self>
}

public struct AsyncEnumeratedSequence<Base: AsyncSequence>: AsyncSequence {
  public typealias Element = (offset: Int, element: Base.Element)
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> (offset: Int, element: Base.Element)?
  }
  
  public func makeAsyncIterator() -> Iterator
}
extension AsyncSequence where Element: AsyncSequence {
  public func joined<Separator: AsyncSequence>(separator: Separator) -> AsyncJoinedSequence<Self, Separator>
}

public struct AsyncJoinedSequence<Base: AsyncSequence> where Base.Element: AsyncSequence {
  public typealias Element = Base.Element.Element
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Base.Element.Element?
  }
  
  public func makeAsyncIterator() -> Iterator
}

One minor difference between JoinedSequence and AsyncJoinedSequence worth noting is that AsyncJoinedSequence must carry the Separator type in order to properly asynchronously stitch the elements together.

extension AsyncSequence {
  public func elementsEqual<OtherSequence: AsyncSequence>(_ other: OtherSequence, by areEquivalent: (Element, OtherSequence.Element) async throws -> Bool) async rethrows -> Bool
}
public func zip<Sequence1, Sequence2>(_ sequence1: Sequence1, _ sequence2: Sequence2) -> AsyncZip2Sequence<Sequence1, Sequence2>

public struct AsyncZip2Sequence<Sequence1: AsyncSequence, Sequence2: AsyncSequence>: AsyncSequence {
  public typealias Element = (Sequence1.Element, Sequence2.Element)
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> (Sequence1.Element, Sequence2.Element)?
  }
  
  public func makeAsyncIterator() -> Iterator
}

For additional N-ary variants of zip; it is relatively easy to make a three part by using two zips and a map;

let zipOf3 = zip(zip(a, b), c).map { ($0.0, $0.1, $1) }

The N-ary variants are not often used and are fairly trivial to compose. Given that, we feel that they are not within the scope of extending both AsyncSequence and Sequence.

Examples

func useSomeAsyncSequence<Source: AsyncSequence>(_ source: Source) async rethrows {
  let items = try await source.collect()
  ... 
}
useSomeAsyncSequence([1, 2, 3].async)
func useEnumerated<Source: AsyncSequence>(_ source: Source) async rethrows {
   for try await (index, element) in source.enumerated() {
     ...
   }
}

Detailed Design

This is already being used in the implementation for the tests for the AsyncSequence "operators". The implementation for Sequence.async can be found here and the implementation for AsyncSequence.collect() can be found here

Impact on Existing Code

Other than the tests needing to be reworked to use the new implementations this is purely additive.

Alternatives Considered

The property on Sequence for constructing an asynchronous sequence could be just an initializer; however that wont work well as composition. AsyncLazySequence([1, 2, 3].map { $0.description }) is a bit hard to reason and it misses the parity with lazy.

The function collect could be an initializer for Array but it suffers the same ergonomic issue with the extension variable async on Sequence.

Alternatively collect could take a collector of sorts that gets values (e.g. some sort of inout RangeReplaceableCollection a la Kotlin). This approach offers some flexibility of the type the values are collected into however it does not lend itself to ergonomics-of-use since it is incumbent upon the developer to know that a given type can conform to the given interface.

10 Likes

By "ergonomic issue", do you mean trailing syntax is better?

If the collect APIs were added to RangeReplaceableCollection, they would be usable by Array, Data, String, etc.

extension RangeReplaceableCollection {

  @inlinable
  public init<S: AsyncSequence>(
    _ elements: S
  ) async rethrows where S.Element == Element {
    self.init()
    try await append(contentsOf: elements)
  }

  @inlinable
  public mutating func append<S: AsyncSequence>(
    contentsOf elements: S
  ) async rethrows where S.Element == Element {
    for try await element in elements {
      append(element)
    }
  }
}

(And possibly also + and += operators with AsyncSequence operands.)

The problem with initializers or append would be that you will potentially have a large trailing syntax something along the lines of:

someAsyncSequence
  .map { processItem($0) }
  .filter { $0.isValid }
  .flatMap { $0.results }

Which for processes of steps to apply things isn't too uncommon. Now obviously one could store this out into a temporary variable and initialize, but more often than not it is rather easy to just tack on yet another operation .collect().

I am unsure if async initializers are a thing yet or not. But the use site for the previous would have a contrast of the following:

let results = try await someAsyncSequence
  .map { processItem($0) }
  .filter { $0.isValid }
  .flatMap { $0.results }
  .collect()

// versus
let results = try await Array(someAsyncSequence
  .map({ processItem($0) })
  .filter({ $0.isValid })
  .flatMap({ $0.results }))

Which is a pretty gnarly initializer to read but also potentially hard to type check.

2 Likes

Funnily enough I downloaded the latest toolchain this morning to play around with some of the AsyncSequence stuff. I started looking through how to move Sequence <-> AsyncSequence and couldn't see anything obvious, so this post came along at the perfect time.

Whats proposed definitely seems to smooth off the rough edges of Sequence <-> AsyncSequence interoperability. In fact I grabbed the proposed implementation and using it just felt 'right'.

So from a first pass and very quick play this seems like a great addition. I need to have a deeper think about how this will effect a number of use cases, but a great start so far.

1 Like

This all requires "rethrowing protocol conformances" pitched before, which hasn't yet been reviewed or re-pitched, yes? (I'd be very interested in seeing how the authors refine that pitch in light of the issues discussed in the first pitch thread.)

@Douglas_Gregor would be able to comment better than myself on the results of that pitch, but this (as other parts of AsyncSequence) defers to the consequence of that pitch. However the spelling of the rethrows pans out is what this will adopt as the rest of AsyncSequence does.

This proposal generally makes sense to me. I'm sad to see so much duplication between the sync and async versions of sequences, but it is reasonable given the current design points.

On the top line question of "how does the sync and async world work together?", I'm curious about this:

Have you considered a complementary approach based around futures? In addition or instead of the above, we could instead have a series of APIs like this:

This more closely models what is really going on with async and is pretty highly precedented in other languages that started out with futures and added async later.

It's worth observing that these are complementary approaches that enable different design patterns. I'm just curious if you've thought about this and discarded it, or haven't considered it.

-Chris

I presume you are speaking of Task.Handle<T>? Other Future implementations are definitely out of scope. The Task.Handle side of things is pretty much the same as it just being an async function.

I think there are two different cases here of asynchronous behavior; an asynchronous sequence of values where each value is asynchronously delivered, or grabbing all of the asynchronous values as one fail swoop. The collect API is basically a conversion from the former to the latter. So if someone wants to in some future point grab all values (as they would with a "future type") they can store the sequence definition and then at some later point call collect on it to determine all of the values produced.

Additionally the rethrows should be at the same point as the asynchronous fetch of values, because errors happen asynchronously. This would mean that the future type would need to encapsulate the source of it's throwy-ness. Which would mean that the future type would need to embody the source type.

In general I am not trying to suggest any other mechanism other than the existing structured concurrency facilities existing.

Particularly one portion worth noting collect is pretty much isomorphic to a reduce:

func collect() async rethrows -> [Element] {
   return try await reduce(into: [Element]()) { $0.append($1) }
}

It just happens to be much easier to remember/write collect.

What happens if the sequence is “infinite” or more or less like a stream? This is more of an architecture issue with sequence that we are not able to distinguish stream like sequences from regular finite sequences.

Is there anything we can do to prevent collect from not returning?

I wish async had default timeouts but the current design forces all cancellable async operations to be throwing.

Would it be feasible to have a throwing version that takes a timeout?

There is no prevention to this, it will have the same functionality as calling reduce on an open ended range. e.g. (0...).reduce(0.0) { $0 + 1.0 / Double($1 * $1) } even though that some folks may realize that the result should just be Double.pi / 6.0 sadly there is no way to avoid this. The cases where finite sequences are used collect is immensely useful and shouldn't preclude due to the existence of infinite sequences; similarly to how reduce exists on non async sequences that can be practically infinite.

Time is a can of worms that has been deferred from my understanding - so I don't feel like opening that without consensus. That being said; any timeout behavior that exists in the underlying sequence will be expected to work with the functions present on async sequence. Similarly for cancel.

That last point is something to consider in the regards to infinite sources. If say the behavior of an asynchronous sequence is to throw a CancellationError on cancel of the task. Then this function would throw. However if the behavior is not throwing and the cancellation behavior is to return nil (marking the end of the stream) then a cancel would yield a partial emission of the stream and return what it has collected so far.

This means that the meaningfulness of a timeout can be trivially be done above this: in that you can run a detached task that returns a value produced from this method and cancel at the leisure of when the timeout occurs. (with the fore-mentioned behavior in the regards to throwing/nil-termination of the base async sequence)

1 Like

As far as I understand, an AsyncSequence is iterated in sequential order by e.g. reduce() and collect(). Would it make sense to add something like parallelCollect() using e.g. DispatchQueue.concurrentPerform()?

Or is the async sequence itself supposed to fan out Tasks if that is desired?

Edit: Ok, that is probably not possible unless AsyncSequence.Element is a Task or something you can separately wait on... Also, you'd need a max. number of elements, I guess.

You are correct in the regards that the order is preserved. The parallel collect would be Task.Group: since the top level source is concurrent by nature. In short: if there was order to start with these reducer style methods preserve that order, if it wasn’t they of course can’t materialize any order on their own.

Terms of Service

Privacy Policy

Cookie Policy