Follow-up on AsyncSequence and parity with Sequence based APIs

Introduction

When writing asynchronous code using AsyncSequence adopters it is often needed to be able to convert back and forth from the async world to the synchronous world. In many other similar APIs there are mechanisms to accommodate for this. Namely of which those conversions work best if they are trailing syntax upon those types to aide in ergonomics of call sites and code-completion.

In the same vein of useful additions; there were a few APIs missing from the initial AsyncSequence proposal that are present on Sequence. A number of these were omitted due to missing features of the language that had not yet been proposed.

Motivation

Learning and testing in Swift is incredibly important as both an implementation requirement but also for developers using the language. The newly introduced AsyncSequence protocol offers great flexibility of expressing values over time but there are a few places where it could use a few connections to the non-async world. One of those connections is the process of collecting all values and awaiting the completion of the asynchronous sequence. The other major point is the creation of stand-in or pre-computed asynchronous sequences.

For nearly all use cases AsyncSequence should have similar interfaces as Sequence. This provides a stable interface for developers to work with and have expectations of prototypes written as non-async code to be just as simple as adopting the asynchronous protocol instead (of course with the caveat of where it makes sense to have symmetry). This is a two way street. Where there are mechanisms that make sense we should also consider those if they make sense to Sequence.

Proposed Solution

To facilitate the collecting of values and moving from the asynchronous world into the synchronous world we should add an extension on AsyncSequence of collect to gather up all the values asynchronously and produce an array of those values. This of course means that the function must follow the effects entailed with said asynchronous iteration; that means that when an AsyncSequence that throws is collected it will throw and when it does not throw of course that collecting process should not throw. This means that the act should be rethrows according to the conformance of the type it is called upon. Furthermore collecting all values must also be in it of itself asynchronous.

extension AsyncSequence {
  public func collect() async rethrows -> [Element]
}

Similar to the construction of LazySequence adopters, Sequence itself can easily produce values that can be represented asynchronously. This can be achieved by a similar extension on Sequence as lazy (in this case async) and a concrete generic type that adapts a given Sequence into a suitable asynchronous sequence.

public struct AsyncLazySequence<Base: Sequence>: AsyncSequence {
  public typealias Element = Base.Element
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async -> Base.Element?
  }
  
  public func makeAsyncIterator() -> Iterator
}

extension Sequence {
  public var async: AsyncLazySequence<Self> { get }
}

In the category of existing APIs missing parity are enumerated(), joined(), elementsEqual(), and zip(). We should add these methods to AsyncSequence.

extension AsyncSequence {
  public func enumerated() -> AsyncEnumeratedSequence<Self>
}

public struct AsyncEnumeratedSequence<Base: AsyncSequence>: AsyncSequence {
  public typealias Element = (offset: Int, element: Base.Element)
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> (offset: Int, element: Base.Element)?
  }
  
  public func makeAsyncIterator() -> Iterator
}
extension AsyncSequence where Element: AsyncSequence {
  public func joined<Separator: AsyncSequence>(separator: Separator) -> AsyncJoinedSequence<Self, Separator>
}

public struct AsyncJoinedSequence<Base: AsyncSequence> where Base.Element: AsyncSequence {
  public typealias Element = Base.Element.Element
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Base.Element.Element?
  }
  
  public func makeAsyncIterator() -> Iterator
}

One minor difference between JoinedSequence and AsyncJoinedSequence worth noting is that AsyncJoinedSequence must carry the Separator type in order to properly asynchronously stitch the elements together.

extension AsyncSequence {
  public func elementsEqual<OtherSequence: AsyncSequence>(_ other: OtherSequence, by areEquivalent: (Element, OtherSequence.Element) async throws -> Bool) async rethrows -> Bool
}
public func zip<Sequence1, Sequence2>(_ sequence1: Sequence1, _ sequence2: Sequence2) -> AsyncZip2Sequence<Sequence1, Sequence2>

public struct AsyncZip2Sequence<Sequence1: AsyncSequence, Sequence2: AsyncSequence>: AsyncSequence {
  public typealias Element = (Sequence1.Element, Sequence2.Element)
  public typealias AsyncIterator = Iterator
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> (Sequence1.Element, Sequence2.Element)?
  }
  
  public func makeAsyncIterator() -> Iterator
}

For additional N-ary variants of zip; it is relatively easy to make a three part by using two zips and a map;

let zipOf3 = zip(zip(a, b), c).map { ($0.0, $0.1, $1) }

The N-ary variants are not often used and are fairly trivial to compose. Given that, we feel that they are not within the scope of extending both AsyncSequence and Sequence.

Examples

func useSomeAsyncSequence<Source: AsyncSequence>(_ source: Source) async rethrows {
  let items = try await source.collect()
  ... 
}
useSomeAsyncSequence([1, 2, 3].async)
func useEnumerated<Source: AsyncSequence>(_ source: Source) async rethrows {
   for try await (index, element) in source.enumerated() {
     ...
   }
}

Detailed Design

This is already being used in the implementation for the tests for the AsyncSequence "operators". The implementation for Sequence.async can be found here and the implementation for AsyncSequence.collect() can be found here

Impact on Existing Code

Other than the tests needing to be reworked to use the new implementations this is purely additive.

Alternatives Considered

The property on Sequence for constructing an asynchronous sequence could be just an initializer; however that wont work well as composition. AsyncLazySequence([1, 2, 3].map { $0.description }) is a bit hard to reason and it misses the parity with lazy.

The function collect could be an initializer for Array but it suffers the same ergonomic issue with the extension variable async on Sequence.

Alternatively collect could take a collector of sorts that gets values (e.g. some sort of inout RangeReplaceableCollection a la Kotlin). This approach offers some flexibility of the type the values are collected into however it does not lend itself to ergonomics-of-use since it is incumbent upon the developer to know that a given type can conform to the given interface.

15 Likes

By "ergonomic issue", do you mean trailing syntax is better?

If the collect APIs were added to RangeReplaceableCollection, they would be usable by Array, Data, String, etc.

extension RangeReplaceableCollection {

  @inlinable
  public init<S: AsyncSequence>(
    _ elements: S
  ) async rethrows where S.Element == Element {
    self.init()
    try await append(contentsOf: elements)
  }

  @inlinable
  public mutating func append<S: AsyncSequence>(
    contentsOf elements: S
  ) async rethrows where S.Element == Element {
    for try await element in elements {
      append(element)
    }
  }
}

(And possibly also + and += operators with AsyncSequence operands.)

The problem with initializers or append would be that you will potentially have a large trailing syntax something along the lines of:

someAsyncSequence
  .map { processItem($0) }
  .filter { $0.isValid }
  .flatMap { $0.results }

Which for processes of steps to apply things isn't too uncommon. Now obviously one could store this out into a temporary variable and initialize, but more often than not it is rather easy to just tack on yet another operation .collect().

I am unsure if async initializers are a thing yet or not. But the use site for the previous would have a contrast of the following:

let results = try await someAsyncSequence
  .map { processItem($0) }
  .filter { $0.isValid }
  .flatMap { $0.results }
  .collect()

// versus
let results = try await Array(someAsyncSequence
  .map({ processItem($0) })
  .filter({ $0.isValid })
  .flatMap({ $0.results }))

Which is a pretty gnarly initializer to read but also potentially hard to type check.

2 Likes

Funnily enough I downloaded the latest toolchain this morning to play around with some of the AsyncSequence stuff. I started looking through how to move Sequence <-> AsyncSequence and couldn't see anything obvious, so this post came along at the perfect time.

Whats proposed definitely seems to smooth off the rough edges of Sequence <-> AsyncSequence interoperability. In fact I grabbed the proposed implementation and using it just felt 'right'.

So from a first pass and very quick play this seems like a great addition. I need to have a deeper think about how this will effect a number of use cases, but a great start so far.

1 Like

This all requires "rethrowing protocol conformances" pitched before, which hasn't yet been reviewed or re-pitched, yes? (I'd be very interested in seeing how the authors refine that pitch in light of the issues discussed in the first pitch thread.)

@Douglas_Gregor would be able to comment better than myself on the results of that pitch, but this (as other parts of AsyncSequence) defers to the consequence of that pitch. However the spelling of the rethrows pans out is what this will adopt as the rest of AsyncSequence does.

This proposal generally makes sense to me. I'm sad to see so much duplication between the sync and async versions of sequences, but it is reasonable given the current design points.

On the top line question of "how does the sync and async world work together?", I'm curious about this:

Have you considered a complementary approach based around futures? In addition or instead of the above, we could instead have a series of APIs like this:

This more closely models what is really going on with async and is pretty highly precedented in other languages that started out with futures and added async later.

It's worth observing that these are complementary approaches that enable different design patterns. I'm just curious if you've thought about this and discarded it, or haven't considered it.

-Chris

I presume you are speaking of Task.Handle<T>? Other Future implementations are definitely out of scope. The Task.Handle side of things is pretty much the same as it just being an async function.

I think there are two different cases here of asynchronous behavior; an asynchronous sequence of values where each value is asynchronously delivered, or grabbing all of the asynchronous values as one fail swoop. The collect API is basically a conversion from the former to the latter. So if someone wants to in some future point grab all values (as they would with a "future type") they can store the sequence definition and then at some later point call collect on it to determine all of the values produced.

Additionally the rethrows should be at the same point as the asynchronous fetch of values, because errors happen asynchronously. This would mean that the future type would need to encapsulate the source of it's throwy-ness. Which would mean that the future type would need to embody the source type.

In general I am not trying to suggest any other mechanism other than the existing structured concurrency facilities existing.

Particularly one portion worth noting collect is pretty much isomorphic to a reduce:

func collect() async rethrows -> [Element] {
   return try await reduce(into: [Element]()) { $0.append($1) }
}

It just happens to be much easier to remember/write collect.

1 Like

What happens if the sequence is “infinite” or more or less like a stream? This is more of an architecture issue with sequence that we are not able to distinguish stream like sequences from regular finite sequences.

Is there anything we can do to prevent collect from not returning?

I wish async had default timeouts but the current design forces all cancellable async operations to be throwing.

Would it be feasible to have a throwing version that takes a timeout?

There is no prevention to this, it will have the same functionality as calling reduce on an open ended range. e.g. (0...).reduce(0.0) { $0 + 1.0 / Double($1 * $1) } even though that some folks may realize that the result should just be Double.pi / 6.0 sadly there is no way to avoid this. The cases where finite sequences are used collect is immensely useful and shouldn't preclude due to the existence of infinite sequences; similarly to how reduce exists on non async sequences that can be practically infinite.

Time is a can of worms that has been deferred from my understanding - so I don't feel like opening that without consensus. That being said; any timeout behavior that exists in the underlying sequence will be expected to work with the functions present on async sequence. Similarly for cancel.

That last point is something to consider in the regards to infinite sources. If say the behavior of an asynchronous sequence is to throw a CancellationError on cancel of the task. Then this function would throw. However if the behavior is not throwing and the cancellation behavior is to return nil (marking the end of the stream) then a cancel would yield a partial emission of the stream and return what it has collected so far.

This means that the meaningfulness of a timeout can be trivially be done above this: in that you can run a detached task that returns a value produced from this method and cancel at the leisure of when the timeout occurs. (with the fore-mentioned behavior in the regards to throwing/nil-termination of the base async sequence)

1 Like

As far as I understand, an AsyncSequence is iterated in sequential order by e.g. reduce() and collect(). Would it make sense to add something like parallelCollect() using e.g. DispatchQueue.concurrentPerform()?

Or is the async sequence itself supposed to fan out Tasks if that is desired?

Edit: Ok, that is probably not possible unless AsyncSequence.Element is a Task or something you can separately wait on... Also, you'd need a max. number of elements, I guess.

You are correct in the regards that the order is preserved. The parallel collect would be Task.Group: since the top level source is concurrent by nature. In short: if there was order to start with these reducer style methods preserve that order, if it wasn’t they of course can’t materialize any order on their own.

Since this helped bridge some experimentation code and I can see it being useful for others in future, I'm wondering what the status of this discussion is @Philippe_Hausler?

I'd just like to bump this thread, given recent discussions about how to make task-groups feel more integrated in the language.

It's surprising that the standard library doesn't offer support for, say, creating an Array, or a Dictionary, Set, etc, from an AsyncSequence. We have support for regular Sequence all over the place; it's something you kind of take for granted, but these little conveniences really help make the system feel like a cohesive whole, where everything just works together.

It isn't difficult to write these things yourself, but you shouldn't have to. It means you lose a lot of things like type inference, and it just feels like a basic operation that should be built-in:

var result: [Key: Value] = [:]
for await (key, value) in sequence {
  result[key] = value
}

Two more points I'd like to make:

  1. Naming.

    I see the appeal of something like sequence.collect() over Array(sequence), but the latter is more familiar and scales better. Arrays are probably the most common collection anybody uses, but Dictionary, Set, etc - Data, OrderedDictionary, Deque, what-have-you, are important, too; it's hard to scale sequence.collect() to all possible data types.

  2. Performance.

    Sequence has things like underestimatedCount if you need to reserve capacity in advance. I don't see a similar API on AsyncSequence. Array has an exponential growth strategy, so if you append a lot in a very large loop, each individual append will be constant-time on average at the cost of occupying a lot more memory. That's why we generally advise people to use reserveCapacity(sequence.underestimatedCount) if possible.

    It's possible that AsyncSequence won't be able to guarantee an under-estimate, as it may represent a remote resource that could change. Perhaps it could be extended to add an estimatedCount, and be allowed to return an over-estimate.

    Also, Sequence has a bunch of hooks for more efficient creation of Arrays (_copyToContiguousArray), initialisation of unsafe buffers, etc. Again, those kinds of things would make great additions to AsyncSequence (e.g. I'd imagine AsyncStream could implement this to bulk-copy anything it had buffered, and that's probably even more of a benefit for an async loop, because it means fewer suspension points).

Once you start thinking about real parity in how Sequence and AsyncSequence are used, these kinds of gaps become more apparent. I think it's important that we address as much of this as possible before the feature ships.

7 Likes

Thanks for bringing this back up - it is something that really should be addressed quickly in my opinion. Else we may lose out on some important ideas here that are quite useful.

The problem that I have with the initializer strategy is that it becomes quite cumbersome when thinking of the large applications of things; e.g. comparing to other functional frameworks that might have definitions like someSource.map { doStuff($0) }.filter { $0.property == true }.flatMap { $0.items } participating in that "chaining" style definition has a distinct appeal. Perhaps the right answer is: why not both? We could implement the collect in terms of the Array initializer.

I don't think we can make a generalized initializer available for overload on other sequences though. The ship has sailed per required initializers for even things like RangeReplaceableCollection (which IIUC is the one that could have a default implementation). So at best we could hope for is implement it by hand on every single thing we think it might make sense on. Which I am not sure that really scales how you would hope. Perhaps I am missing some brilliant idea here on how to approach that differently.

Im not sure what an estimated count would be, would it just by default return Int.max? That perhaps seems like a bad idea to initialize an array reserving Int.max elements...

This is something that I have thought about a decent amount: and granted the inliner is super powerful and makes most of this not needed... I still see where it could be useful:
Perhaps we could have an additional requirement (non underscore) to AsyncSequence that has a default implementation.

@rethrows
protocol AsyncIteratorProtocol { 
  associatedtype Element /*: Sendable */ 

  mutating func next() async rethrows -> Element?
  mutating func withNextUnsafeBuffer(_ apply: (Unsafe BufferPointer<Element>) -> Void) async rethrows
}

extension AsyncIteratorProtocol {
  mutating func withNextUnsafeBuffer(_ apply: (Unsafe BufferPointer<Element>) -> Void) async rethrows {
    var element = try await next()
    withUnsafePointer(&element) { ptr in 
       apply(UnsafeBufferPointer(start: ptr, count: 1))
   }
}
1 Like

My understanding is that it's still possible to add requirements to a protocol, so long as they have a default implementation. Are you saying that doesn't apply to initialisers?

No, it would still be better to underestimate (the default implementation could return 0, as it does for Sequence), but, for example, Array.append(contentsOf:) will trap if, after consuming the sequence, there happened to be fewer elements than expected. It could just shift the tail contents down, but instead it has chosen to be strict.

Code using AsyncSequence should be more lenient. Any types which reserve capacity for n elements will have to accept that the sequence might end up emitting more or less than n elements.

TaskGroup in particular should be able to say exactly how many tasks are in the group.

withNextUnsafeBuffer looks interesting for consuming the sequence in batches. Unsafe buffers are interesting because most custom storage types can provide them, and in the worst case you can just make them from a single element, which makes them a good universal type without involving copying or allocations. It's unfortunate that they have to be unsafe. I'm not fond of telling people that, in order to batch-consume elements, they'd need to drop down to a type with no bounds-checking.

I wish we had a BoundsCheckedUnsafeBufferPointer (which would only be unsafe because of lifetimes; so the only thing you'd have to worry about is not escaping it. Some complex collection algorithm with an off-by-one error wouldn't silently violate memory safety).

The one issue with it would be that protocol is defined in the standard library and not the concurrency library - so I am not sure how we can square that circle of the _Concurrency library requiring the standard library and the standard library requiring _Concurrency.

1 Like

One thing which I've noticed is the lack of a forEach(_:) for AsyncSequence, which would allow for clean and complete FRP-like function chains. AFAIU, could it be simply implemented like this:

extension AsyncSequence  {
    func forEach(_ body: (Element) async throws -> Void) async rethrows {
        for try await element in self {
            try await body(element)
        }
    }
}

Now, for example, one can do this:

await userStream
    .map(\.name)
    .filter { $0 != "deadbeef" }
    .compactMap(Int.init)
    .forEach { await userActor.send($0) }

instead of this:

for await user in userStream
        .map(\.name)
        .filter { $0 != "deadbeef" }
        .compactMap(Int.init) {
    await userActor.send(user)
}

Which I find feels much more awkward (indentation, weird braces, presence of both shorthand function parameters and the element label).

Of course, the method could be renamed to something like awaitForEach(_:) or forEachAwait(_:) to more clearly show that this is an asynchronous for await loop. I don't think calling it something closer to FRP "jargon" (eg. sink(_:), onNext(_:)) is a good idea, since it should feel similar to using Sequence.

Do you think this would be a good addition to AsyncSequence?

3 Likes

I specifically avoided that one since it evoked some push-back. Mainly of which the example you posted is pretty much the same thing and it was expressed that perhaps having two ways to accomplish the same goal was too confusing for learning a new system.

I tend to disagree with that sentiment; wrt the fact that it should have consistency and parity with Sequence. So I will let the rest of the community weigh in here to perhaps convince those that might descent from this perspective.

Writing as you did does side step the token return like some other FRP systems not sure if that is a good or bad thing tbh.

I understand those who do not want to fracture how you can execute a for-await loop, however I agree that forEach(_:) would be necessary to gain full parity with Sequence and co. I think the lack of a return tokens demonstrates the strengths of AsyncSequence: functionality which is usually thrown at the user library-level is now handled by the language itself.

I'd love to hear what other people think though – I for one definitely think forEach(_:) is an important addition to AsyncSequence, not only for ergonomics but also to gain parity with Sequence.

5 Likes