Feel free to comment on this code.
Is it correct?
Is it optimal?
Is it readable?
public enum AsyncIterationMode {
case serial
case concurrent(priority: TaskPriority?, parallellism: Int)
public static let concurrent = concurrent(priority: nil, parallellism: ProcessInfo.processInfo.processorCount)
}
public extension Sequence {
func asyncMap<NewElement>(
mode: AsyncIterationMode = .concurrent,
_ transform: @escaping (Element) async throws -> NewElement
) async rethrows -> [NewElement] {
switch mode {
case .serial:
var result: [NewElement] = []
result.reserveCapacity(underestimatedCount)
for element in self {
result.append(try await transform(element))
}
return result
case let .concurrent(priority, paralellism):
return try await withThrowingTaskGroup(of: (Int, NewElement).self) { group in
var i = 0
var iterator = self.makeIterator()
var results = [NewElement?]()
results.reserveCapacity(underestimatedCount)
func submitTask() throws {
try Task.checkCancellation()
if let element = iterator.next() {
results.append(nil)
group.addTask(priority: priority) { [i] in (i, try await transform(element)) }
i += 1
}
}
// add initial tasks
for _ in 0..<paralellism { try submitTask() }
// submit more tasks, as each one completes, until we run out of work
while let (index, result) = try await group.next() {
results[index] = result
try submitTask()
}
return results.compactMap { $0 }
}
}
}
}