[Concurrency] AsyncSequence

You can even imagine, in a hypothetical Swift where for was an expression, that this would be valid:

let x = await try for await try line in await try makeURL().lines() {
  
}

which would mean "await the result of this for loop, which itself awaits each item in the AsyncSequence produced by awaiting makeURL().lines()"

Not that we should actually do that, but it illustrates how all three possible places to put the keywords are semantically meaningful, but they don't mean the same thing.

I sympathize with concerns elsewhere in the thread about keyword soup, but it's useful to remember that all the pieces of it are independently meaningful, and the reason there's a lot of them in a small space is we're packing an enormous amount of functionality into a small space.

15 Likes

Don't forget flatMap.

Thanks, that should be in the list.

Before I read any of the other replies, I felt I had to respond about the “Sets and Dictionary aren’t really Sequences” problem. Some of the other replies hinted at the problem. It’s caused by Sequence being over-specified. The for-in construct does not require the source sequence to have the order it vends elements be part of that type’s semantic; but (almost) all of Sequence’s methods do make that assumption. I’ll probably make a list of related threads when I type on my Mac (instead of this iPad).

The solution is to make a new protocol like Sequence whose semantics rip out the vending-order-is-part-of-the-semantic part. Technically, based on computer science theory, it could be a base protocol for Sequence, but ABI stability forbids inserting new base protocols, even if care is taken to preserve source stability.

You’re suggesting something similar to Sequence, so the same concerns apply. But it’s new, so we can add the base protocol(s) now. I know it’s more of a concern for computer science nerds and probably 80% of users won’t need it, but I’m suggesting base protocols, and as such can’t be added later. This happened with AdditiveArithmetic and Numeric right before ABI stability. I’ll come up with an outline after I wake up.

Oh, is there a reason the new sequence protocol calls its associated iterator type “AsyncIterator”? Unless you think types can simultaneously AsyncSequence and Sequence, you could reuse the “Iterator“ name.

2 Likes

With respect to AsyncIterator, we think there is no good reason to reuse the associated type name and basically prohibit something adopting both (or require some new language feature to disambiguate).

It’s off topic so we shouldn’t debate it here – we can move this sub-thread to a new discussion topic if more discussion is desired – but this is not true. Nowhere in the docs for Sequence does it specify ordering. Given sequences aren’t even guaranteed multi pass, you can’t infer from that that the order has any meaning other than that if you must step through something an element at a time, that stepping must of course happen in some order.

8 Likes

And there’s a precedent from JS where for await (const element of generator()) { ... } is the syntax for iterating over an async generator.

1 Like

I'm the last to defend Sequence (or the whole hierarchy of collection-protocols ;-), and if I had to decide, I wouldn't consider adding AsyncSequence — but afaics, this concept is much more "sequential" than the precedent (which clashes heavily with long established definitions):
Asking a set for a prefix doesn't make sense, but the possible examples of AsyncSequence I can think of have a meaningful order. Because elements arrive one after another, it should be clear that the timing defines it, and you can build on top of this premise.

I'm glad not to see some problematic Sequence methods like reversed in the list, but I'd also remove count().
Also, I would at least wait with the inclusion of the protocol until it is possible to define getters that may throw: It would be unfortunate if first would be (and stay!) a method just because of this limitation.

1 Like

I'll start with the core protocols, with some documentation added.

/// A generator for a possibly-infinite (multi-)set of values, each vended
/// asynchronously.
protocol AsynchronousIteratorProtocol {
    /// The type of objects vended by the iterator.
    associatedtype Element
    /// The type of errors possibly thrown by `next()`.
    associatedtype Error: Swift.Error

    /// Returns a not-yet-vended element from the virtual set; or `nil` if that
    /// set has been exhausted, a previous call threw an error, or vending has
    /// been either canceled or otherwise terminated.
    mutating func next() async throws(Error) -> Element?
    /// Sends the instance's iteration state into termination, after being
    /// called by the runtime whenever the surrounding context ends iteration
    /// early.
    mutating func cancel()
}

/// A coupler between a possibly-infinite (multi-)set of values and a `for`-`in`
/// loop that can visit all the members asynchronously.
protocol AsynchronousCollective {
    /// The type of objects vended by the collective.
    associatedtype Element
    /// The generator that actually provides access to the elements.
    associatedtype AsynchronousIterator: AsynchronousIteratorProtocol where AsynchronousIterator.Element == Element

    /// Returns an iterator that can asynchronously visit every element of the
    /// collective exactly to each value's multiplicity.
    func makeIterator() -> AsynchronousIterator

    /// Returns whether the given value is a member of the virtual set, or `nil`
    /// if either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customContainsEquatableElement(_ element: Element) async throws(AsynchronousIterator.Error) -> Bool?

    /// Copy as many elements as possible into the given buffer, returning a
    /// continuation point for the longer operand.
    func _copyContents(initializing pointer: UnsafeMutableBufferPointer<Element>) async throws(AsynchronousIterator.Error) -> (;AsynchronousIterator, UnsafeMutableBufferPointer<Element>.Index;)
}

Where throws(Whatever) is a typed-throws specifier. A non-throwing method would use Never as the error type and a general throwing method would use Swift.Error as the error type (since it self-conforms). The "(; Whatever1, Whatever2 ;)" syntax is for sum-type tuples, i.e. a replacement for the Either type used in several functional languages.

There are several axes an asynchronous collective may be extended. There's finite vs. infinite, single- vs. multi-pass, publication order being important or not, indexed access to elements or not, mutable access to elements or not, and mutable rearrangement of elements or not.

/// An asynchronous collective that whose virtual multi-set of elements is
/// finite, meaning that all iterators that don't get canceled can call `next()`
/// only a finite number of times until `nil` gets returned.
protocol FiniteCollective: AsynchronousCollective {
    /// A lower bound for the number of elements that will be vended if a
    /// returned iterator is not canceled, calculated nondestructively.
    var underestimatedCount: Int { get }

    /// Confirms whether the collective has no elements, possibly destructively.
    func checksAsEmpty() async throws(AsynchronousIterator.Error) -> Bool
    /// Counts the number of elements in the collective, possibly destructively.
    func postCount() async throws(AsynchronousIterator.Error) -> Int

    /// Copy all the elements into a native array buffer, maintaining the order
    /// vended.
    __consuming func _copyToContiguousArray() async throws(AsynchronousIterator.Error) -> ContiguousArray<Element>
}

/// An asynchronous collective where the order that its elements are vended will
/// be part of the semantics, and said order is stable for a given state of the
/// virtual multi-set.
protocol AsynchronousSequence: AsynchronousCollective {}

/// An asynchronous collective that can support multiple passes, via iterators
/// that can mutate their iteration state without corrupting other instances or
/// the collective.
protocol ReusableCollective: AsynchronousCollective {}

/// An asynchronous collective with even stronger support for multiple passes,
/// via iterators that are safe from corruption even if the collective is later
/// mutated.
protocol ReusablePureCollective: ReusableCollective {}

/// An asynchronous collective that vends its elements in the same order from
/// every returned iterator instance (as long as the virtual multi-set isn't
/// mutated).
protocol PersistentCollective: ReusableCollective {}

/// An asynchronous collective that can also nondestructively reference its
/// elements directly via index tokens.
protocol IndexedCollective: ReusableCollective {
    /// A token to directly access an element.
    associatedtype Index: Equatable
    /// A set of tokens for direct access to all the elements.
    associatedtype Indices: IndexedCollective where Indices.Element == Index, Indices.Index == Index

    /// Accesses the element that the given token points to.
    subscript(position: Index) -> Element { get }
    /// A collective providing all of this collective's elements' tokens.
    var indices: Indices { get }

    /// Returns the index of some element of the collective that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customIndexOfEquatableElement(_ element: Element) async throws(Indices.AsynchronousIterator.Error) -> Index??
}

/// An asynchronous collective that can mutate its elements.
protocol MutableCollective: IndexedCollective {
    /// Accesses the element that the given token points to, for both read and
    /// write.
    subscript(position: Index) -> Element { get set }

    /// Exchanges the values at the specified indices of the collective.
    mutating func swapAt(_ i: Index, _ j: Index)
}

/// An asynchronous collective that can remove arbitrary elements or reserve
/// space for new ones.
protocol DeletingCollective: IndexedCollective, FiniteCollective where Indices: FiniteCollective {
    /// The type of errors possibly thrown by `remove(along:)`.
    associatedtype RemovalError: Swift.Error

    /// Prepares, as well as possible, the collective to store the given number
    /// of elements.
    mutating func reserveCapacity(_ amount: Int)

    /// Removes the elements at the given indices.
    mutating func remove<C: AsynchronousCollective>(along: C) async throws(RemovalError) where C.Element == Index

    /// Remove every element from the collective.
    mutating func removeAll(keepingCapacity: Bool)
}

You can generally just compose protocols for the right capabilities you need. I didn't put the methods like forEach or map; I'll figure that out tomorrow. There are some combinations of protocols that enable additional customization points.

/// An asynchronous collective that may cache its elements in a memory buffer.
protocol PersistentFiniteCollective: PersistentCollective, FiniteCollective {
    /// Returns the result of applying the given closure to the in-memory buffer
    /// caching the elements of this collective, either creating the buffer if
    /// necessary or returning `nil` instead.
    func withContiguousStorageIfAvailable<R>(
        _ body: (UnsafeBufferPointer<Element>) throws -> R
    ) rethrows -> R?
}

/// An asynchronous collective that can mutate its elements through a buffer.
protocol MutablePersistentFiniteCollective: PersistentFiniteCollective {
    /// Returns the result of applying the given closure to the in-memory
    /// writable buffer containing the elements of this collective, either
    /// creating the buffer if necessary or returning `nil` instead.
    mutating func withContiguousMutableStorageIfAvailable<R>(
        _ body: (inout UnsafeMutableBufferPointer<Element>) throws -> R
    ) rethrows -> R?
}

Including, of course, an adaptation of the Collection hierarchy.

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide linear traversal of said
/// tokens.
protocol AsynchronousCollection
  : AsynchronousSequence, PersistentFiniteCollective, IndexedCollective
   where Index: Comparable, Indices: AsynchronousCollection, Indices.SubSequence == Indices, Indices.AsynchronousIterator.Error == Never
{
    /// The type representing contiguous subsequences (*i.e.* substrings) of the
    /// collection.
    associatedtype SubSequence: AsynchronousCollection where SubSequence.Element == Element, SubSequence.Index == Index, SubSequence.SubSequence == SubSequence

    /// The position of the first element in a nonempty collection.
    var startIndex: Index { get }
    /// The collection's "past the end" position—that is, the position one
    /// greater than the last valid subscript argument.
    var endIndex: Index { get }

    /// Accesses a contiguous subrange of the collection's elements.
    subscript(bounds: Range<Index>) -> SubSequence { get }

    /// Returns the index of first element of the collection that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customFirstIndexOfEquatableElement(_ element: Element) async -> Index??
    /// Returns the index of last element of the collection that equals the
    /// given value, `.some(nil)` if no element matches the value, or `nil` if
    /// either that determination can't be optimized to be superior to linear
    /// time or if `Element` doesn't conform to `Equatable`.
    func _customLastIndexOfEquatableElement(_ element: Element) async -> Index??

    /// Returns an index that is the specified distance from the given index.
    func index(_ i: Index, offsetBy distance: Int) -> Index
    /// Returns an index that is the specified distance from the given index,
    /// unless that distance is beyond a given limiting index.
    func index(
      _ i: Index, offsetBy distance: Int, limitedBy limit: Index
    ) -> Index?
    /// Returns the distance between two indices.
    func distance(from start: Index, to end: Index) -> Int

    /// Returns the position immediately after the given index.
    func index(after i: Index) -> Index
    /// Replaces the given index with its successor.
    func formIndex(after i: inout Index)
}

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide linear traversal of said
/// tokens in both the forward and backward directions.
protocol AsynchronousBidirectionalCollection: AsynchronousCollection {
    /// Returns the position immediately before the given index.
    func index(before i: Index) -> Index
    /// Replaces the given index with its predecessor.
    func formIndex(before i: inout Index)
}

/// An asynchronous sequence that can also nondestructively reference its
/// elements directly via index tokens and provide efficient random-access
/// traversal of said tokens.
protocol AsynchronousRandomAccessCollection: AsynchronousBidirectionalCollection {}

/// An asynchronous sequence that can also nondestructively reference its
/// elements for reading and writing directly via index tokens.
protocol AsynchronousMutableCollection: AsynchronousCollection, MutableCollective, MutablePersistentFiniteCollective {
    /// Accesses a contiguous subrange of the collection's elements for reading
    /// and writing.
    subscript(bounds: Range<Index>) -> SubSequence { get set }

    /// Reorders the elements of the collection such that all the elements that
    /// match the given predicate are after all the elements that don't match.
    mutating func partition(by belongsInSecondPartition: (Element) throws -> Bool) rethrows -> Index

    /// Moves the subsequence of elements before the given index to be after the
    /// subsequence of elements starting at the given index, preserving the
    /// relative order of elements within each partition.
    mutating func swapPartitions(across pivot: Index) -> Index
}

/// An asynchronous sequence that can add, remove, or replace elements while
/// nondestructively referencing those elements via index tokens.
protocol AsynchronousRangeReplaceableCollection: AsynchronousCollection, DeletingCollective {
    /// Creates a new, empty collection.
    init()
    /// Replaces the specified subrange of elements with the given collection.
    mutating func replaceSubrange<C: AsynchronousCollection>(
      _ subrange: Range<Index>,
      with newElements: __owned C
    ) async throws(C.AsynchronousIterator.Error) where C.Element == Element

    /// Creates a new collection containing the specified number of a single,
    /// repeated value.
    init(repeating repeatedValue: Element, count: Int)
    /// Creates a new instance of a collection containing the elements of a
    /// sequence.
    init<S: AsynchronousSequence>(_ elements: S)
      where S.Element == Element

    /// Adds an element to the end of the collection.
    mutating func append(_ newElement: __owned Element)
    /// Adds the elements of a sequence or collection to the end of this
    /// collection.
    mutating func append<S: AsynchronousSequence>(contentsOf newElements: __owned S)
      where S.Element == Element

    /// Inserts a new element into the collection at the specified position.
    mutating func insert(_ newElement: __owned Element, at i: Index)
    /// Inserts the elements of a sequence into the collection at the specified
    /// position.
    mutating func insert<S: AsynchronousCollection>(contentsOf newElements: __owned S, at i: Index)
      where S.Element == Element

    /// Removes and returns the element at the specified position.
    @discardableResult
    mutating func remove(at i: Index) -> Element
    /// Removes the specified subrange of elements from the collection.
    mutating func removeSubrange(_ bounds: Range<Index>)
    /// Removes and returns the first element of the collection.
    @discardableResult
    mutating func removeFirst() -> Element
    /// Removes the specified number of elements from the beginning of the
    /// collection.
    mutating func removeFirst(_ k: Int)
    /// Removes all the elements that satisfy the given predicate.
    mutating func removeAll(
      where shouldBeRemoved: (Element) throws -> Bool) rethrows
}

I have a PR up with a fix for that: https://github.com/apple/swift/pull/34979

3 Likes

This looks really nice, just one question.

The functions that return a single value are especially interesting because they increase usability by changing a loop into a single await line. Functions in this category are first , contains , count , min , max , reduce , and more. Functions that return a new AsyncSequence include filter , map , and compactMap .

I'm having some trouble understanding how these would be used in practice because they consume the sequence, so e.g. if you want to get the number of lines in the file and then read in the file, it seems like you're better off manually iterating over the sequence and counting+collecting values into an Array. Or if you want to check if the sequence contains two values, you can't because you've already consumed part/all of it calling contains the first time.

Am I misunderstanding the use case for these, or have you considered a complementary multi-pass AsyncCollection?

2 Likes

I understand that concern, but there is a different and generally useful solution to that, which is to allow deinit on structs. This is (roughly, not fully committed of course) "Plan of Record" as part of the ownership proposal.

Your framing here is exactly why I brought this up: these are exactly what move semantics will provide to the library designer: elimination of ARC and providing the ability to capture these invariants into the type system. Once we have the ability to have move-only AsyncIterator types, this all becomes very easy to define and reason about.

It seems really unfortunate to introduce these special rules when proper language support is right around the corner.

@John_McCall how do you expect move semantics to interact with protocols like this? I would expect that AsyncIteratorProtocol.next would have to borrow self, but it would otherwise "just work".

-Chris

2 Likes

Ok, I can see that argument, but the "stuff that runs each time" is a destructuring pattern, not imperative code. Destructuring patterns (unlike general patterns) cannot include expressions and other things in them, so the pattern itself being async doesn't make much sense.

On the other hand, the logic synthesized by the for loop does include async behavior (and propagates it outwards the enclosed function). I often think of the for/in loop as being a "hygenic macro hard coded into the language". The way marking generally works is that the "thing that can suspend/throws" gets marked with await/try, and the for loop definitely suspends in this case, not the pattern.

-Chris

I would expect that operations on iterator types would generally be exclusive borrows (i.e. mutating).

Thanks for writing out this idea @CTMacUser.

It seems that the core design point is that the 'collective' type is not just a collection but also a kind of lazy asynchronous buffer. I believe that, if we decided such a type is useful, we could build it on top of AsyncSequence in a manner like the existing protocol hierarchy.

I'd like to reiterate a core idea of the AsyncSequence protocol is a focus on ease of adoption. That does leave something on the table, perhaps, vs a complete set of protocols that can be arbitrarily composed, but I think that the tradeoff towards simplicity is worth it.

1 Like

Hi @jack,

You are correct that awareness of the cost of iterating the async collection has to be part of writing code that uses it.

I imagine the implementation of our hypothetical URL.lines() function above would iterate the contents of the URL each time you called for/in on it. This does not preclude the possibility of some lower level intelligent cache, nor does it preclude the possibility of introducing a buffering async collection type in the future.

I'm not sure if an API like lines would be written to return such a type, though, because it seems to require a buffer that is the size of the file wether you want it or not (and assumes a finite file size). It might be something we would optionally compose with a sequence-based lines API.

If I understand correctly (a big if), we can mark the cancel function as __consuming, which will allow us to more smoothly transition into move-only implementations of future AsyncIterators, when move only types are finished. I imagine we will need to figure out a transition story for all iterators at that time, including existing Sequence API, correct?

2 Likes

The main point of the design I gave is that AsyncCollective (or whatever the name will be) can't be built on top of AsyncSequence; the abstract sub-typing is actually the other way! (AS refines ACve by imposing requirements for a fixed publication order and said order being semantically significant.) ABI stability means that it can't be fixed later.

The thing I'm primarily concerned about is the language/compiler (in this case, the implementation of the for/in loop) knowing about this style of cancelation, not about the protocol requirement. This notion of cancelation don't have anything to do with async - the same issue arises today when breaking out of a for-in loop over a Sequence iterator that has internal state.

What I'm observing here is that we have the ability to model this today for Sequences (it is just inefficient due to the extra class indirection) and that ownership will allow allow this to be efficient as well, all without extending the language or compiler. We have exactly the same situation with the async version of this, so it would be nice if we didn't have to add this wart to the compiler and language as a short term workaround.

Of course, with the compiler/language support dropped, it follows that it wouldn't have to be part of the protocol at either, further increasing consistency here.

-Chris

6 Likes

Related to cancel/deinit:

Looking at the .net equivalent to this, they have both a cancellation token and DisposeAsync. How does this design compare to that?

  1. If the cancel here could potentially be modelled as a deinit, is that equivalent to .Net’s Dispose? Should cancel be cancel() async?

  2. Is there an equivalent to the cancellation token, where the source of the items going into the AsyncSequence can be stopped? Eg. Cancelling the network request from a separate thread/actor

Terms of Service

Privacy Policy

Cookie Policy