sveinhal
(Svein Halvor Halvorsen)
1
Feel free to comment on this code.
Is it correct?
Is it optimal?
Is it readable?
public enum AsyncIterationMode {
case serial
case concurrent(priority: TaskPriority?, parallellism: Int)
public static let concurrent = concurrent(priority: nil, parallellism: ProcessInfo.processInfo.processorCount)
}
public extension Sequence {
func asyncMap<NewElement>(
mode: AsyncIterationMode = .concurrent,
_ transform: @escaping (Element) async throws -> NewElement
) async rethrows -> [NewElement] {
switch mode {
case .serial:
var result: [NewElement] = []
result.reserveCapacity(underestimatedCount)
for element in self {
result.append(try await transform(element))
}
return result
case let .concurrent(priority, paralellism):
return try await withThrowingTaskGroup(of: (Int, NewElement).self) { group in
var i = 0
var iterator = self.makeIterator()
var results = [NewElement?]()
results.reserveCapacity(underestimatedCount)
func submitTask() throws {
try Task.checkCancellation()
if let element = iterator.next() {
results.append(nil)
group.addTask(priority: priority) { [i] in (i, try await transform(element)) }
i += 1
}
}
// add initial tasks
for _ in 0..<paralellism { try submitTask() }
// submit more tasks, as each one completes, until we run out of work
while let (index, result) = try await group.next() {
results[index] = result
try submitTask()
}
return results.compactMap { $0 }
}
}
}
}
ksemianov
(Konstantin Semianov)
2
A few comments on the correctness part:
guard parallelism > 0 in the concurrent case may be helpful in preventing unexpected behavior
results.reserveCapacity(underestimatedCount) doesn't guarantee that subsequent results[index] = result won't crash. If the extension was defined on Collection, then I would simply use var results = [NewElement?](repeating: nil, count: count) to initialize
- Before returning the final result, it might be good to check that iteration through the sequence is finished and its length is the same as the length of the final result
Also, this gist might help Swift async/await implementation of a parallel map · GitHub
sveinhal
(Svein Halvor Halvorsen)
3
Great catch!
No, but always calling append(nil) before kicking off another Task will always make sure that result[i] exists by the time the corresponding task is completed, non?
Good idea!
sveinhal
(Svein Halvor Halvorsen)
4
Seems like this is almost exactly the same, except it works on Collections instead of Sequence and therefore can rely on count. Otherwise, the idea seems almost too similar to be be good. So I don't think I can be far off :-)