I'm looking for a 'smooth-functional' way to map a Sequence in a AsyncSequence.
Imagine you have an array of user IDs that you want to retrieve, but how do you do this asynchronously?
let userIds = [1, 2, 3]
let users = userIds
.asyncMap { id in // I have not found this method in any way.
SomeClass.fetchUser(byId: id)
}
for await let user in users {
print("Username: \(user.name)")
}
I looked in the docs, but there doesn't seem to be a native way to do this.
I heard of this, but I have expected this hand-of directly in the standard library.
For me is very unintentional that you can't this directly.
I think the async algorithms will be merged over time into the standard library right?
Personally, I would welcome this being merged into the standard library.
But I also think its current form as a standalone package is good enough, since swift packages can be integrated fairly easily in most situations.
let friends = try await fetchFriendList()
async let profiles = friends.asyncMap { friend in
try await fetchUserProfile(for: friend)
}
Implementation
public enum AsyncIterationMode {
/// Serial iteration performs each step in sequence, waiting for the previous one to complete before performing the next.
case serial
/// Concurrent iteration performs all steps in parallell, and resumes execution when all opeations are done.
/// When applied to `asyncMap`, the results are returned in the original order.
case concurrent(priority: TaskPriority?, parallellism: Int)
/// Concurrent iteration with default priority and the current system's processor count.
public static let concurrent = concurrent(priority: nil, parallellism: ProcessInfo.processInfo.processorCount)
}
public extension Sequence {
func asyncForEach(mode: AsyncIterationMode = .concurrent, _ operation: @escaping (Element) async throws -> Void) async rethrows {
switch mode {
case .serial:
for element in self {
try await operation(element)
}
case .concurrent:
_ = try await asyncMap(mode: mode, operation)
}
}
func asyncMap<NewElement>(
mode: AsyncIterationMode = .concurrent,
_ transform: @escaping (Element) async throws -> NewElement
) async rethrows -> [NewElement] {
switch mode {
case .serial:
var result: [NewElement] = []
result.reserveCapacity(underestimatedCount)
for element in self {
result.append(try await transform(element))
}
return result
case let .concurrent(priority, paralellism):
return try await withThrowingTaskGroup(of: (Int, NewElement).self) { group in
var i = 0
var iterator = self.makeIterator()
var results = [NewElement?]()
results.reserveCapacity(underestimatedCount)
func submitTask() throws {
try Task.checkCancellation()
if let element = iterator.next() {
results.append(nil)
group.addTask(priority: priority) { [i] in (i, try await transform(element)) }
i += 1
}
}
// add initial tasks
for _ in 0..<paralellism { try submitTask() }
// submit more tasks, as each one completes, until we run out of work
while let (index, result) = try await group.next() {
results[index] = result
try submitTask()
}
return results.compactMap { $0 }
}
}
}
}
I suspect the async method from the algorithms package is not what you want. You probably want to spawn a number of network requests presumably in parallel to make it faster.
Firstly, there's the question of whether you want all results at once (i.e. you want to "zip" them), or you are fine with the results arriving in any order at any time. The latter makes sense in GUI apps, i.e. show data as it arrives no matter the order.
And most importantly there's the question of the simultaneous number of requests you send. What if your array has 10,000 elements? Are you OK with launching 10,000 simultaneous URLRequest's? This is where it gets a bit trickier, you'd need a semaphore that limits the number of requests at any given time (unless there's a more modern way of doing it in Swift I'm unaware of, there might be).
I think the above problems are best addressed with task groups: they give you all the flexibility you need.
Yes I want to fetch some data in parallel and I don't want them all in once.
After thinking about it again, I think you right, a AsyncSequence use one thread right? It's concurrent but not parallel right?
I think in my specific use case there is no need for multiple threads, but I'm interested in a parallel solution for this.
Normally I think is the case for a TaskGroup, but it's not functiona styled.
Are the a solution in async-algorithms for this?
Are there currently no replacements for semaphores in Swift with structured concurrency? I think a kind of an executer which capsule 2 Queues (One as buffer and One with fixed size for task execution) will be a way more swifty.
If you want to do an async map in parallel using task groups, I've written this before, though last I checked it generates compiler warnings as unsafe pointers aren't sendable.
extension Collection {
func parallelMap<T: Sendable>(transform: @Sendable @escaping (Element) async -> T) async -> [T]
where Element: Sendable {
let ptr = UnsafeMutableBufferPointer<T>.allocate(capacity: self.count)
defer {
ptr.deinitialize()
ptr.deallocate()
}
await withTaskGroup(of: Void.self) { group in
for (offset, element) in self.enumerated() {
group.addTask {
let result = await transform(element)
ptr.initializeElement(at: offset, to: result)
}
}
await group.waitForAll()
}
return Array(ptr)
}
}
Since this returns an array, why not to simplify this along with more friendly to concurrency checks code?
That's wrong, see below
extension Collection {
func parallelMap<T: Sendable>(
transform: @Sendable @escaping (Element) async -> T
) async -> [T] where Element: Sendable {
await withTaskGroup(of: [T].self) { group in
for element) in self.enumerated() {
group.addTask {
return await transform(element)
}
}
var result: [T] = []
for await element in group {
result.append(element)
}
return result
}
}
}
This breaks the ordering promise of map, doesn't it? iirc iterating over a TaskGroup gives results in the order the tasks finished, not the order they were added.
Whoops, thanks. Then replacing with array won't work, you're right, but still it is possible to address warnings/errors in version with buffer pointer:
extension Collection {
func parallelMap<T: Sendable>(
transform: @Sendable @escaping (Element) async -> T
) async -> [T] where Element: Sendable {
await withTaskGroup(of: [T].self) { group in
for (index, element) in self.enumerated() {
group.addTask {
let transformedElement = await transform(element)
return (index, transformedElement)
}
}
let ptr = UnsafeMutableBufferPointer<T>.allocate(capacity: self.count)
defer {
ptr.deinitialize()
ptr.deallocate()
}
for await element in group {
ptr.initializeElement(at: element.0, to: element.1)
}
return Array(ptr)
}
}
}
Yeah, the .serial version of this is pretty naïve, and I added it after initially supporting only the .concurrent (or I guess it should be called .parallell?) iteration mode.
For my use case it works fine, but I guess it should be easy to check for cancellation by calling try Task.checkCancellation() in each iteration.
The main goal is to have a map that supports parallelism, though.