Async concurrent and serial map

Feel free to comment on this code.

Is it correct?
Is it optimal?
Is it readable?

public enum AsyncIterationMode {
    case serial
    case concurrent(priority: TaskPriority?, parallellism: Int)

    public static let concurrent = concurrent(priority: nil, parallellism: ProcessInfo.processInfo.processorCount)
}

public extension Sequence {
    func asyncMap<NewElement>(
        mode: AsyncIterationMode = .concurrent,
        _ transform: @escaping (Element) async throws -> NewElement
    ) async rethrows -> [NewElement] {
        switch mode {

        case .serial:
            var result: [NewElement] = []
            result.reserveCapacity(underestimatedCount)
            for element in self {
                result.append(try await transform(element))
            }
            return result

        case let .concurrent(priority, paralellism):
            return try await withThrowingTaskGroup(of: (Int, NewElement).self) { group in
                var i = 0
                var iterator = self.makeIterator()
                var results = [NewElement?]()
                results.reserveCapacity(underestimatedCount)

                func submitTask() throws {
                    try Task.checkCancellation()
                    if let element = iterator.next() {
                        results.append(nil)
                        group.addTask(priority: priority) { [i] in (i, try await transform(element)) }
                        i += 1
                    }
                }

                // add initial tasks
                for _ in 0..<paralellism { try submitTask() }

                // submit more tasks, as each one completes, until we run out of work
                while let (index, result) = try await group.next() {
                    results[index] = result
                    try submitTask()
                }

                return results.compactMap { $0 }
            }
        }
    }
}

A few comments on the correctness part:

  1. guard parallelism > 0 in the concurrent case may be helpful in preventing unexpected behavior
  2. results.reserveCapacity(underestimatedCount) doesn't guarantee that subsequent results[index] = result won't crash. If the extension was defined on Collection, then I would simply use var results = [NewElement?](repeating: nil, count: count) to initialize
  3. Before returning the final result, it might be good to check that iteration through the sequence is finished and its length is the same as the length of the final result

Also, this gist might help Swift async/await implementation of a parallel map · GitHub

Great catch!

No, but always calling append(nil) before kicking off another Task will always make sure that result[i] exists by the time the corresponding task is completed, non?

Good idea!

Seems like this is almost exactly the same, except it works on Collections instead of Sequence and therefore can rely on count. Otherwise, the idea seems almost too similar to be be good. So I don't think I can be far off :-)