Introducing AsyncCollection

Hello, Swift community! I'm here to discuss a new idea. I'm sorry if it was already discussed but if this was the case I wasn't able to find it.

I think would be a very logical consequence to AsyncSequence to implement AsyncCollection.

This could permit parallelized work, something that is not possible with AsyncSequence since AsyncIterator can only obtain one element at time.

So, for example, we could have this:

for await dog in dogs.preloaded(next: 3) {
    ... 
}

where preloaded starts loading the other 3 dogs without waiting to the first to be returned.

This would be very useful also to avoid using withTaskGroup to parallelize a dynamic number of child tasks. Borrowing the semantic of the async-algorithms package we could obtain this:

let dogIDs: [Dog.ID] = [...]
let dogs = await dogIDs.async.map { await db.getDog($0) }.groupedInTask()

instead of

let dogIDs: [Dog.ID] = [...]
let dogs = await withTaskGroup(of: Dog.self, returning: [Dog].self) { taskGroup in
    for dog in dogIDs {
        taskGroup.addTask {
            await db.getDog(id)
        }
    }
    var dogs = [Dog]()
    for await dog in taskGroup {
        dogs.append(dog)
    }
    return dogs
}

and to maintain order in this case we need extra logic too.

What do you think?

7 Likes

While I think the idea of preloading is worth investigating, I think that the name AsyncCollection doesn't fit. Collection's characteristic distinction over Sequence is having indices. AsyncCollection, in your pitch, doesn't.

1 Like

It should have Indeces indeed, in order to be able to traverse it, I think.

what are Indeces?

Isn't this basically like a database cursor's usecase?

I like and use what you're talking about, but it doesn't seem specifically relevant to collections, to me. With AsyncChannels, you can parallelize work and not have to wait for a whole task group to finish, to process values. There's probably a better approach than a set of maps, but I haven't seen or thought of it yet.

Your groupedInTask seems impossible to me—I think you need to start with something synchronous, because of what you said about AsyncIterator.

import AsyncAlgorithms
import HeapModule

public extension Sequence {
  /// Transform a sequence asynchronously, and potentially in parallel.
  /// - Returns: An `AsyncSequence` which returns transformed elements, in their original order,
  /// as soon as they become available.
  func mapWithTaskGroup<Transformed: Sendable>(
    priority: TaskPriority? = nil,
    _ transform: @escaping @Sendable (Element) async -> Transformed
  ) -> AsyncChannel<Transformed> {
    let channel = AsyncChannel<Transformed>()
    Task { await mapWithTaskGroup(channel: channel, transform) }
    return channel
  }

  /// Transform a sequence asynchronously, and potentially in parallel.
  /// - Returns: An `AsyncSequence` which returns transformed elements, in their original order,
  /// as soon as they become available.
  func mapWithTaskGroup<Transformed: Sendable>(
    priority: TaskPriority? = nil,
    _ transform: @escaping @Sendable (Element) async throws -> Transformed
  ) -> AsyncThrowingChannel<Transformed, Error> {
    let channel = AsyncThrowingChannel<Transformed, Error>()
    Task {
      do {
        try await mapWithTaskGroup(channel: channel, transform)
      } catch {
        channel.fail(error)
      }
    }
    return channel
  }
}

// MARK: - private
private protocol AsyncChannelProtocol<Element> {
  associatedtype Element
  func send(_: Element) async
  func finish()
}

extension AsyncChannel: AsyncChannelProtocol { }
extension AsyncThrowingChannel: AsyncChannelProtocol { }

private extension Sequence {
  private func mapWithTaskGroup<Transformed>(
    channel: some AsyncChannelProtocol<Transformed>,
    priority: TaskPriority? = nil,
    _ transform: @escaping @Sendable (Element) async throws -> Transformed
  ) async rethrows {
    typealias ChildTaskResult = Heap<Int>.ElementValuePair<Transformed>
    try await withThrowingTaskGroup(of: ChildTaskResult.self) { group in
      for (offset, element) in enumerated() {
        group.addTask(priority: priority) {
          .init(offset, try await transform(element))
        }
      }

      var heap = Heap<ChildTaskResult>()
      var lastSentOffset = -1
      for try await childTaskResult in group {
        heap.insert(childTaskResult)
        // Send as many in-order `Transformed`s as possible.
        while heap.min()?.element == lastSentOffset + 1 {
          await channel.send(heap.removeMin().value)
          lastSentOffset += 1
        }
      }

      channel.finish()
    }
  }
}
Heap.ElementValuePair
import HeapModule

public extension Heap {
  /// A "`Value`" that uses an accompanying `Heap.Element` for sorting  via a `Heap`.
  /// - Note: If `Value` itself is `Comparable`, it can of course be inserted into a Heap directly.
  ///   This type is explicitly for cases where a different sorting rule is desired.
  struct ElementValuePair<Value> {
    public var element: Element
    public var value: Value
  }
}

// MARK: - public
public extension Heap.ElementValuePair {
  init(_ element: Element, _ value: Value) {
    self.init(element: element, value: value)
  }
}

// MARK: - Comparable
extension Heap.ElementValuePair: Comparable {
  public static func < (lhs: Self, rhs: Self) -> Bool {
    lhs.element < rhs.element
  }

  /// Only necessary because Comparable: Equatable. 😞
  public static func == (lhs: Self, rhs: Self) -> Bool {
    fatalError()
  }
}

i think database cursors are more AsyncSequence-like than Collection-like, no? they are sequences of collections (batches).

it sounds like what is really being asked here is how to flatten the inner (non-async) collection without writing a nested loop.

Yeah, but the proposal/idea is not really a collection either. It rather feels like a cursor-like set of helpers on top of AsyncSequence

A type that represents the indices that are valid for subscripting the collection.

groupedInTask would be possible because .async should return an AsyncCollection from the synchronous collection.

I think it really should be like a collection indeed. The main gain of AsyncCollection from AsyncSequence would be the same between the synchronous counterparts. I'll cite the docs:

Although a sequence can be consumed as it is traversed, a collection is guaranteed to be multipass: Any element can be repeatedly accessed by saving its index. Moreover, a collection's indices form a finite range of the positions of the collection's elements. The fact that all collections are finite guarantees the safety of many sequence operations, such as using the contains(_:) method to test whether a collection includes an element.

So, thinking of AsyncCollection as an array of async closures, you could have access to every single closure at any time, instead of awaiting for the previous one to finish (as occurs with AsyncSequence). This access permits parallelization.

So for example you could use it in this way (AsyncArray implements AsyncCollection):

let dbEntities: AsyncArray = DB.getEntities()
let secondEntity = await dbEntities[1]
let entities = await dbEntities[0..<min(10, dbEntities.endIndex)] // this could load entities in parallel like in `withTaskGroup`, maintaining order furthermore.
1 Like

You have a map in between. I didn't realize that you imagined creating a new Collection type to replace all of the Sequence Conforming Types.

I think the concept of keeping parallelized asynchronous work in-order across multiple function calls is impossible to implement, regardless. You could process a sequence of Tasks, and group them at the end of a chain, but if that were realistic, why would have have TaskGroup instead of e.g. Sequence<Task>.concurrentPerform?

That's what I imagined you were going for. You're merging two useful concepts into one, when they should be kept separate.

  1. AsyncCollection would be useful for interface standardization, but realistically, subscripting based on a range is not going involve calling a subscript with a single index, multiple times, in parallel. It's going to batch up some work and return a synchronous collection.

  2. Parallelizing asynchronous work is appropriate for all sequences, not just collections, as I showed above.

No, why replace? Just add conformance to types that can implement it.

Why? Is already possibile with a raw implementation of async closure to just add to the result of the async closure the index of the collection and use that index to maintain order as the TaskGroup returns values. How is possible that a real core implementation involving runtime generated methods to not be able to obtain this?

I can't think of reasons to justify this design, surely there will be. Maybe it's the implementation that most closely resembles what happens at a low level.

That was just a random example I thought of what we could achieve (in terms of syntactic sugar). Subscripting with a range an AsyncCollection should return synchronously an AsyncSlice on that Collection, the await call then is made on that type.

Sure, this derives from the fact a synchronous Sequence can easily become a Collection (not safely however since Iterator could generate values infinitely). AsyncSequence could not become AsyncCollection (without waiting Iterator to return nil, collecting the result and making the collection of results an AsyncCollection)