[Concurrency] AsyncSequence

I'm the last to defend Sequence (or the whole hierarchy of collection-protocols ;-), and if I had to decide, I wouldn't consider adding AsyncSequence — but afaics, this concept is much more "sequential" than the precedent (which clashes heavily with long established definitions):
Asking a set for a prefix doesn't make sense, but the possible examples of AsyncSequence I can think of have a meaningful order. Because elements arrive one after another, it should be clear that the timing defines it, and you can build on top of this premise.

I'm glad not to see some problematic Sequence methods like reversed in the list, but I'd also remove count().
Also, I would at least wait with the inclusion of the protocol until it is possible to define getters that may throw: It would be unfortunate if first would be (and stay!) a method just because of this limitation.

1 Like

I'll start with the core protocols, with some documentation added.

/// A generator for a possibly-infinite (multi-)set of values, each vended
/// asynchronously.
protocol AsynchronousIteratorProtocol {
    /// The type of objects vended by the iterator.
    associatedtype Element
    /// The type of errors possibly thrown by `next()`.
    associatedtype Error: Swift.Error

    /// Returns a not-yet-vended element from the virtual set; or `nil` if that
    /// set has been exhausted, a previous call threw an error, or vending has
    /// been either canceled or otherwise terminated.
    mutating func next() async throws(Error) -> Element?
    /// Sends the instance's iteration state into termination, after being
    /// called by the runtime whenever the surrounding context ends iteration
    /// early.
    mutating func cancel()
}

/// A coupler between a possibly-infinite (multi-)set of values and a `for`-`in`
/// loop that can visit all the members asynchronously.
protocol AsynchronousCollective {
    /// The type of objects vended by the collective.
    associatedtype Element
    /// The generator that actually provides access to the elements.
    associatedtype AsynchronousIterator: AsynchronousIteratorProtocol where AsynchronousIterator.Element == Element

    /// Returns an iterator that can asynchronously visit every element of the
    /// collective exactly to each value's multiplicity.
    func makeIterator() -> AsynchronousIterator

    /// Returns whether the given value is a member of the virtual set, or `nil`
    /// if either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customContainsEquatableElement(_ element: Element) async throws(AsynchronousIterator.Error) -> Bool?

    /// Copy as many elements as possible into the given buffer, returning a
    /// continuation point for the longer operand.
    func _copyContents(initializing pointer: UnsafeMutableBufferPointer<Element>) async throws(AsynchronousIterator.Error) -> (;AsynchronousIterator, UnsafeMutableBufferPointer<Element>.Index;)
}

Where throws(Whatever) is a typed-throws specifier. A non-throwing method would use Never as the error type and a general throwing method would use Swift.Error as the error type (since it self-conforms). The "(; Whatever1, Whatever2 ;)" syntax is for sum-type tuples, i.e. a replacement for the Either type used in several functional languages.

There are several axes an asynchronous collective may be extended. There's finite vs. infinite, single- vs. multi-pass, publication order being important or not, indexed access to elements or not, mutable access to elements or not, and mutable rearrangement of elements or not.

/// An asynchronous collective that whose virtual multi-set of elements is
/// finite, meaning that all iterators that don't get canceled can call `next()`
/// only a finite number of times until `nil` gets returned.
protocol FiniteCollective: AsynchronousCollective {
    /// A lower bound for the number of elements that will be vended if a
    /// returned iterator is not canceled, calculated nondestructively.
    var underestimatedCount: Int { get }

    /// Confirms whether the collective has no elements, possibly destructively.
    func checksAsEmpty() async throws(AsynchronousIterator.Error) -> Bool
    /// Counts the number of elements in the collective, possibly destructively.
    func postCount() async throws(AsynchronousIterator.Error) -> Int

    /// Copy all the elements into a native array buffer, maintaining the order
    /// vended.
    __consuming func _copyToContiguousArray() async throws(AsynchronousIterator.Error) -> ContiguousArray<Element>
}

/// An asynchronous collective where the order that its elements are vended will
/// be part of the semantics, and said order is stable for a given state of the
/// virtual multi-set.
protocol AsynchronousSequence: AsynchronousCollective {}

/// An asynchronous collective that can support multiple passes, via iterators
/// that can mutate their iteration state without corrupting other instances or
/// the collective.
protocol ReusableCollective: AsynchronousCollective {}

/// An asynchronous collective with even stronger support for multiple passes,
/// via iterators that are safe from corruption even if the collective is later
/// mutated.
protocol ReusablePureCollective: ReusableCollective {}

/// An asynchronous collective that vends its elements in the same order from
/// every returned iterator instance (as long as the virtual multi-set isn't
/// mutated).
protocol PersistentCollective: ReusableCollective {}

/// An asynchronous collective that can also nondestructively reference its
/// elements directly via index tokens.
protocol IndexedCollective: ReusableCollective {
    /// A token to directly access an element.
    associatedtype Index: Equatable
    /// A set of tokens for direct access to all the elements.
    associatedtype Indices: IndexedCollective where Indices.Element == Index, Indices.Index == Index

    /// Accesses the element that the given token points to.
    subscript(position: Index) -> Element { get }
    /// A collective providing all of this collective's elements' tokens.
    var indices: Indices { get }

    /// Returns the index of some element of the collective that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customIndexOfEquatableElement(_ element: Element) async throws(Indices.AsynchronousIterator.Error) -> Index??
}

/// An asynchronous collective that can mutate its elements.
protocol MutableCollective: IndexedCollective {
    /// Accesses the element that the given token points to, for both read and
    /// write.
    subscript(position: Index) -> Element { get set }

    /// Exchanges the values at the specified indices of the collective.
    mutating func swapAt(_ i: Index, _ j: Index)
}

/// An asynchronous collective that can remove arbitrary elements or reserve
/// space for new ones.
protocol DeletingCollective: IndexedCollective, FiniteCollective where Indices: FiniteCollective {
    /// The type of errors possibly thrown by `remove(along:)`.
    associatedtype RemovalError: Swift.Error

    /// Prepares, as well as possible, the collective to store the given number
    /// of elements.
    mutating func reserveCapacity(_ amount: Int)

    /// Removes the elements at the given indices.
    mutating func remove<C: AsynchronousCollective>(along: C) async throws(RemovalError) where C.Element == Index

    /// Remove every element from the collective.
    mutating func removeAll(keepingCapacity: Bool)
}

You can generally just compose protocols for the right capabilities you need. I didn't put the methods like forEach or map; I'll figure that out tomorrow. There are some combinations of protocols that enable additional customization points.

/// An asynchronous collective that may cache its elements in a memory buffer.
protocol PersistentFiniteCollective: PersistentCollective, FiniteCollective {
    /// Returns the result of applying the given closure to the in-memory buffer
    /// caching the elements of this collective, either creating the buffer if
    /// necessary or returning `nil` instead.
    func withContiguousStorageIfAvailable<R>(
        _ body: (UnsafeBufferPointer<Element>) throws -> R
    ) rethrows -> R?
}

/// An asynchronous collective that can mutate its elements through a buffer.
protocol MutablePersistentFiniteCollective: PersistentFiniteCollective {
    /// Returns the result of applying the given closure to the in-memory
    /// writable buffer containing the elements of this collective, either
    /// creating the buffer if necessary or returning `nil` instead.
    mutating func withContiguousMutableStorageIfAvailable<R>(
        _ body: (inout UnsafeMutableBufferPointer<Element>) throws -> R
    ) rethrows -> R?
}

Including, of course, an adaptation of the Collection hierarchy.

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide linear traversal of said
/// tokens.
protocol AsynchronousCollection
  : AsynchronousSequence, PersistentFiniteCollective, IndexedCollective
   where Index: Comparable, Indices: AsynchronousCollection, Indices.SubSequence == Indices, Indices.AsynchronousIterator.Error == Never
{
    /// The type representing contiguous subsequences (*i.e.* substrings) of the
    /// collection.
    associatedtype SubSequence: AsynchronousCollection where SubSequence.Element == Element, SubSequence.Index == Index, SubSequence.SubSequence == SubSequence

    /// The position of the first element in a nonempty collection.
    var startIndex: Index { get }
    /// The collection's "past the end" position—that is, the position one
    /// greater than the last valid subscript argument.
    var endIndex: Index { get }

    /// Accesses a contiguous subrange of the collection's elements.
    subscript(bounds: Range<Index>) -> SubSequence { get }

    /// Returns the index of first element of the collection that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customFirstIndexOfEquatableElement(_ element: Element) async -> Index??
    /// Returns the index of last element of the collection that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customLastIndexOfEquatableElement(_ element: Element) async -> Index??

    /// Returns an index that is the specified distance from the given index.
    func index(_ i: Index, offsetBy distance: Int) -> Index
    /// Returns an index that is the specified distance from the given index,
    /// unless that distance is beyond a given limiting index.
    func index(
      _ i: Index, offsetBy distance: Int, limitedBy limit: Index
    ) -> Index?
    /// Returns the distance between two indices.
    func distance(from start: Index, to end: Index) -> Int

    /// Returns the position immediately after the given index.
    func index(after i: Index) -> Index
    /// Replaces the given index with its successor.
    func formIndex(after i: inout Index)
}

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide linear traversal of said
/// tokens in both the forward and backward directions.
protocol AsynchronousBidirectionalCollection: AsynchronousCollection {
    /// Returns the position immediately before the given index.
    func index(before i: Index) -> Index
    /// Replaces the given index with its predecessor.
    func formIndex(before i: inout Index)
}

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide efficient random-access
/// traversal of said tokens.
protocol AsynchronousRandomAccessCollection: AsynchronousBidirectionalCollection {}

/// An asynchronous sequence that can also nondestructively reference its
/// elements for reading and writing directly via index tokens.
protocol AsynchronousMutableCollection: AsynchronousCollection, MutableCollective, MutablePersistentFiniteCollective {
    /// Accesses a contiguous subrange of the collection's elements for reading
    /// and writing.
    subscript(bounds: Range<Index>) -> SubSequence { get set }

    /// Reorders the elements of the collection such that all the elements that
    /// match the given predicate are after all the elements that don't match.
    mutating func partition(by belongsInSecondPartition: (Element) throws -> Bool) rethrows -> Index

    /// Moves the subsequence of elements before the given index to be after the
    /// subsequence of elements starting at the given index, preserving the
    /// relative order of elements within each partition.
    mutating func swapPartitions(across pivot: Index) -> Index
}

/// An asynchronous sequence that can add, remove, or replace elements while
/// nondestructively referencing those elements via index tokens.
protocol AsynchronousRangeReplaceableCollection: AsynchronousCollection, DeletingCollective {
    /// Creates a new, empty collection.
    init()
    /// Replaces the specified subrange of elements with the given collection.
    mutating func replaceSubrange<C: AsynchronousCollection>(
      _ subrange: Range<Index>,
      with newElements: __owned C
    ) async throws(C.AsynchronousIterator.Error) where C.Element == Element

    /// Creates a new collection containing the specified number of a single,
    /// repeated value.
    init(repeating repeatedValue: Element, count: Int)
    /// Creates a new instance of a collection containing the elements of a
    /// sequence.
    init<S: AsynchronousSequence>(_ elements: S)
      where S.Element == Element

    /// Adds an element to the end of the collection.
    mutating func append(_ newElement: __owned Element)
    /// Adds the elements of a sequence or collection to the end of this
    /// collection.
    mutating func append<S: AsynchronousSequence>(contentsOf newElements: __owned S)
      where S.Element == Element

    /// Inserts a new element into the collection at the specified position.
    mutating func insert(_ newElement: __owned Element, at i: Index)
    /// Inserts the elements of a sequence into the collection at the specified
    /// position.
    mutating func insert<S: AsynchronousCollection>(contentsOf newElements: __owned S, at i: Index)
      where S.Element == Element

    /// Removes and returns the element at the specified position.
    @discardableResult
    mutating func remove(at i: Index) -> Element
    /// Removes the specified subrange of elements from the collection.
    mutating func removeSubrange(_ bounds: Range<Index>)
    /// Removes and returns the first element of the collection.
    @discardableResult
    mutating func removeFirst() -> Element
    /// Removes the specified number of elements from the beginning of the
    /// collection.
    mutating func removeFirst(_ k: Int)
    /// Removes all the elements that satisfy the given predicate.
    mutating func removeAll(
      where shouldBeRemoved: (Element) throws -> Bool) rethrows
}

I have a PR up with a fix for that: [SILGen] Branch on result of next() before conversion. by varungandhi-apple · Pull Request #34979 · apple/swift · GitHub

3 Likes

This looks really nice, just one question.

The functions that return a single value are especially interesting because they increase usability by changing a loop into a single await line. Functions in this category are first , contains , count , min , max , reduce , and more. Functions that return a new AsyncSequence include filter , map , and compactMap .

I'm having some trouble understanding how these would be used in practice because they consume the sequence, so e.g. if you want to get the number of lines in the file and then read in the file, it seems like you're better off manually iterating over the sequence and counting+collecting values into an Array. Or if you want to check if the sequence contains two values, you can't because you've already consumed part/all of it calling contains the first time.

Am I misunderstanding the use case for these, or have you considered a complementary multi-pass AsyncCollection?

2 Likes

I understand that concern, but there is a different and generally useful solution to that, which is to allow deinit on structs. This is (roughly, not fully committed of course) "Plan of Record" as part of the ownership proposal.

Your framing here is exactly why I brought this up: these are exactly what move semantics will provide to the library designer: elimination of ARC and providing the ability to capture these invariants into the type system. Once we have the ability to have move-only AsyncIterator types, this all becomes very easy to define and reason about.

It seems really unfortunate to introduce these special rules when proper language support is right around the corner.

@John_McCall how do you expect move semantics to interact with protocols like this? I would expect that AsyncIteratorProtocol.next would have to borrow self, but it would otherwise "just work".

-Chris

2 Likes

Ok, I can see that argument, but the "stuff that runs each time" is a destructuring pattern, not imperative code. Destructuring patterns (unlike general patterns) cannot include expressions and other things in them, so the pattern itself being async doesn't make much sense.

On the other hand, the logic synthesized by the for loop does include async behavior (and propagates it outwards the enclosed function). I often think of the for/in loop as being a "hygenic macro hard coded into the language". The way marking generally works is that the "thing that can suspend/throws" gets marked with await/try, and the for loop definitely suspends in this case, not the pattern.

-Chris

I would expect that operations on iterator types would generally be exclusive borrows (i.e. mutating).

Thanks for writing out this idea @CTMacUser.

It seems that the core design point is that the 'collective' type is not just a collection but also a kind of lazy asynchronous buffer. I believe that, if we decided such a type is useful, we could build it on top of AsyncSequence in a manner like the existing protocol hierarchy.

I'd like to reiterate a core idea of the AsyncSequence protocol is a focus on ease of adoption. That does leave something on the table, perhaps, vs a complete set of protocols that can be arbitrarily composed, but I think that the tradeoff towards simplicity is worth it.

1 Like

Hi @jack,

You are correct that awareness of the cost of iterating the async collection has to be part of writing code that uses it.

I imagine the implementation of our hypothetical URL.lines() function above would iterate the contents of the URL each time you called for/in on it. This does not preclude the possibility of some lower level intelligent cache, nor does it preclude the possibility of introducing a buffering async collection type in the future.

I'm not sure if an API like lines would be written to return such a type, though, because it seems to require a buffer that is the size of the file wether you want it or not (and assumes a finite file size). It might be something we would optionally compose with a sequence-based lines API.

If I understand correctly (a big if), we can mark the cancel function as __consuming, which will allow us to more smoothly transition into move-only implementations of future AsyncIterators, when move only types are finished. I imagine we will need to figure out a transition story for all iterators at that time, including existing Sequence API, correct?

2 Likes

The main point of the design I gave is that AsyncCollective (or whatever the name will be) can't be built on top of AsyncSequence; the abstract sub-typing is actually the other way! (AS refines ACve by imposing requirements for a fixed publication order and said order being semantically significant.) ABI stability means that it can't be fixed later.

The thing I'm primarily concerned about is the language/compiler (in this case, the implementation of the for/in loop) knowing about this style of cancelation, not about the protocol requirement. This notion of cancelation don't have anything to do with async - the same issue arises today when breaking out of a for-in loop over a Sequence iterator that has internal state.

What I'm observing here is that we have the ability to model this today for Sequences (it is just inefficient due to the extra class indirection) and that ownership will allow allow this to be efficient as well, all without extending the language or compiler. We have exactly the same situation with the async version of this, so it would be nice if we didn't have to add this wart to the compiler and language as a short term workaround.

Of course, with the compiler/language support dropped, it follows that it wouldn't have to be part of the protocol at either, further increasing consistency here.

-Chris

6 Likes

Related to cancel/deinit:

Looking at the .net equivalent to this, they have both a cancellation token and DisposeAsync. How does this design compare to that?

  1. If the cancel here could potentially be modelled as a deinit, is that equivalent to .Net’s Dispose? Should cancel be cancel() async?

  2. Is there an equivalent to the cancellation token, where the source of the items going into the AsyncSequence can be stopped? Eg. Cancelling the network request from a separate thread/actor

I think this would be a great addition to Swift. I've previously used for-await-of in JavaScript/Node, with IxJS to add collection operations to them and found it a very clean way to work with long streams of data without exhausting memory.

One issue that has come up in Node with this approach is the overhead of awaiting each iteration of the loop (see here for an example). Would Swift have the same problem? My guess is that it probably doesn't, because await only suspends if the thing generating the responses has to suspend, but it would be good to have that confirmed. One concrete example of this would be consuming a paged web service API. You would only have to suspend every 50 (for example) items in the list to wait for the next page of results.

I think this a great suggestion, and I hope it does not go overlooked. It seems like a classic example of Swift taking existing ideas and removing the unnecessary legacy cruft. The word await does not yet have any preconceived meaning in Swift, so await ... in still seems like a natural construct.

From my reading of this, it seems that the way this would work is to wait for the first item in the sequence, then the second, then the third, and so on. Suppose I have an array of URLs, and I want to fetch the contents of them and turn them into Strings, and then perform some task with the resulting array of strings. Is there a plan for a way to iterate over these where each item can be performed concurrently? In this case I wouldn't care the order in which requests start or finish (though I would want the final array to be in the same order as the original).

Something like this:

let array: [URL] = ...

let result = await array.asyncMap { await fetchString(forURL: $0) }

// do something with result

Late to the party here, but I just wanted to give a big +1 to the concerns @Chris_Lattner3 is raising; this seems to desperately call for move-only types, and compromising the language/library (especially if it must be binary stable indefinitely into the future) seems somewhat short-sighted.

Thank you everyone for your hard work!

Maybe it's worth mentioning these names in the Alternatives Considered section.

Overall I really like this proposal. I would just like some clarifications. Firstly, I noticed you included the methods append(_:) and prepend(_:), which –– if I'm not mistaken –– are not part of today's Sequence protocol; is there a particular reason for this choice? Also, a lazy sequence's compactMap(_:) method currently has a return type LazyMapSequence<..., ElementOfResult> *, whereas in AsyncSequence it returns AsyncCompactMapSequence. I wonder if there's a reason for this or if it was done so as to fit in the table.

* The full return type is: LazyMapSequence<LazyFilterSequence<LazyMapSequence<Elements, ElementOfResult?>>, ElementOfResult>

The generic signature of the return types were elided for brevity. AsyncCompactMapSequence would have a generic signature of AsyncCompactMapSequence<Upstream: AsyncSequence, Transformed>

I am not sure compactMap in terms of async sequences can be expressed as a composition of map + filter + map. Lazy and Async have similarities but are not 1-to-1 mappings.

1 Like