We know that OperationQueue
is still commonly used in codebases to achieve parallel decomposition of work. Its counterpart in Structured Concurrency is TaskGroup
. However, OperationQueue
provides a convenient way to limit parallelism using the maxConcurrentOperationCount
property. This is particularly useful when making multiple API calls in parallel without overwhelming the server. Unfortunately, TaskGroup
does not offer a built-in alternative.
We can achieve similar functionality using a combination of Actor
, TaskGroup
, and AsyncStream
. I propose the following solution:
fileprivate actor TaskLimiter {
private let maxConcurrentTasks: Int
private var runningTasks = 0
init(maxConcurrentTasks: Int) {
self.maxConcurrentTasks = maxConcurrentTasks
}
func acquire() async {
while runningTasks >= maxConcurrentTasks {
await Task.yield() // Yield to allow other tasks to proceed
}
runningTasks += 1
}
func release() {
runningTasks -= 1
}
}
public func limitedParallelismTaskGroup<T>(
maxConcurrentTasks: Int,
tasks: [@Sendable () async -> T]
) -> AsyncStream<T> where T: Sendable {
let limiter = TaskLimiter(maxConcurrentTasks: maxConcurrentTasks)
return AsyncStream { continuation in
Task {
await withTaskGroup(of: T.self) { group in
for task in tasks {
group.addTask {
await limiter.acquire() // Wait for a slot
let result = await task()
await limiter.release()
return result
}
}
for await result in group {
continuation.yield(result)
}
continuation.finish()
}
}
}
}
The actor works similarly to DispatchSemaphore
for limiting parallelism. While a task waits to acquire a slot, Task.yield()
is called to free up resources for other tasks.
An example usage is shown below:
func mockTask(id: Int) async -> String {
try? await Task.sleep(nanoseconds: UInt64(id * 100_000_000)) // simulate variable delay
return "Task \(id) completed"
}
let mockTasks: [@Sendable () async -> String] = (1...10).map { id in
return { await mockTask(id: id) }
}
// Consume the AsyncStream returned by the limiter
func doSomething() {
Task {
let stream = limitedParallelismTaskGroup(maxConcurrentTasks: 3, tasks: mockTasks)
for await result in stream {
print(result)
}
}
}
Similarly, a throwing variant can also be exposed as a global function. Adding this to the standard library would be highly beneficial for developers, especially since OperationQueue
is not available in the newer Swift Foundation.