[Pitch] Limited Parallelism TaskGroup

We know that OperationQueue is still commonly used in codebases to achieve parallel decomposition of work. Its counterpart in Structured Concurrency is TaskGroup. However, OperationQueue provides a convenient way to limit parallelism using the maxConcurrentOperationCount property. This is particularly useful when making multiple API calls in parallel without overwhelming the server. Unfortunately, TaskGroup does not offer a built-in alternative.

We can achieve similar functionality using a combination of Actor, TaskGroup, and AsyncStream. I propose the following solution:

fileprivate actor TaskLimiter {
    private let maxConcurrentTasks: Int
    private var runningTasks = 0

    init(maxConcurrentTasks: Int) {
        self.maxConcurrentTasks = maxConcurrentTasks
    }

    func acquire() async {
        while runningTasks >= maxConcurrentTasks {
            await Task.yield() // Yield to allow other tasks to proceed
        }
        runningTasks += 1
    }

    func release() {
        runningTasks -= 1
    }
}

public func limitedParallelismTaskGroup<T>(
    maxConcurrentTasks: Int,
    tasks: [@Sendable () async -> T]
) -> AsyncStream<T> where T: Sendable {
    let limiter = TaskLimiter(maxConcurrentTasks: maxConcurrentTasks)
    return AsyncStream { continuation in
        Task {
            await withTaskGroup(of: T.self) { group in
                for task in tasks {
                    group.addTask {
                        await limiter.acquire() // Wait for a slot
                        let result = await task()
                        await limiter.release()
                        return result
                    }
                }

                for await result in group {
                    continuation.yield(result)
                }
                continuation.finish()
            }
        }
    }
}

The actor works similarly to DispatchSemaphore for limiting parallelism. While a task waits to acquire a slot, Task.yield() is called to free up resources for other tasks.

An example usage is shown below:

func mockTask(id: Int) async -> String {
    try? await Task.sleep(nanoseconds: UInt64(id * 100_000_000)) // simulate variable delay
    return "Task \(id) completed"
}

let mockTasks: [@Sendable () async -> String] = (1...10).map { id in
    return { await mockTask(id: id) }
}

// Consume the AsyncStream returned by the limiter
func doSomething() {
    Task {
        let stream = limitedParallelismTaskGroup(maxConcurrentTasks: 3, tasks: mockTasks)
        for await result in stream {
            print(result)
        }
    }
}

Similarly, a throwing variant can also be exposed as a global function. Adding this to the standard library would be highly beneficial for developers, especially since OperationQueue is not available in the newer Swift Foundation.

5 Likes

Yeah this has been a frequently asked for extension.

There's a quick pattern you can plop in your sources that we explained in this WWDC session: Limiting concurrent tasks in TaskGroups @ 11:27 in Beyond the basics of structured concurrency:

func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
    return await withTaskGroup(of: (ChoppedIngredient?).self,
                               returning: [any ChoppedIngredient].self) { group in
        // Concurrently chop ingredients
        let maxChopTasks = min(3, ingredients.count)
        for ingredientIndex in 0..<maxChopTasks {
            group.addTask { await chop(ingredients[ingredientIndex]) }
        }

        // Collect chopped vegetables
        var choppedIngredients: [any ChoppedIngredient] = []
        for await choppedIngredient in group {
            if let choppedIngredient {
                choppedIngredients.append(choppedIngredient)
            }
        }
        return choppedIngredients
    }
}

But a built in pattern would indeed be nicer and worth exploring.

I do think we'd like to revisit in general the ergonomics of task groups before we jump into this perhaps though.

The proposed code uses an array which is a bit limiting as it still requires creating all the task factories up front and having a known size -- instead I think we need to keep in mind potentially infinite sources of tasks which just want to process "at most 4 at the same time".

There's also the question if it could be a mapConcurrent or mapParallel, rather than the full blown task group syntax.

Implementation wise I think when implemented in the existing core task group implementation, this could be done more efficiently as well, rather than a plain high level swift implementation, since we could immediately "kick off" a task when one completes, without having to await explicitly -- the awaiting is how it is today, in order to collect results, not tied to kicking off new work then.

10 Likes

I too felt that getting the tasks in an array was somewhat weird as the all the task factories would have to be given up front. But since I had done the implementation with TaskGroup, I had no other go as TaskGroup can be acquired only as an inout argument via withTaskGroup method.

As you said, if we implement this in core implementation rather than high level Swift, we wouldn't have such problems. As I am new to Swift Open Source codebase, please share where I should look into for this core implementation so that I can maybe arrive at a better solution.

There's also the question if it could be a mapConcurrent or mapParallel, rather than the full blown task group syntax.

Also, I couldn't understand the above statement. Can you explain it?

Such "process an input stream with processing at most N concurrently" can be expressed as a method on such iterator.

For inspiration: I implemented such in the past over here: mapAsync, mapAsyncUnordered, or even mapAsyncPartitioned where parallelism is limited by the partition the processing is assigned to... They have some pretty good examples so that should explain what I meant with it maybe being just a method on a n async sequence.

There have been concerns that such APIs use an explicit number to configure the parallelism, which isn't very portable -- if you write a library and hardcode parallelism to "6" it may be finer on a mac or on a server, but the same library may be used on a low-power device like a watch where that high parallelism would be a not great idea... So there's concerns about allowing any API that has a way to hardcode a "number" of parallelism. Although it certianly is a common request and it makes sense sometimes to always just "process at most two in parallel" but such APIs are prone to be abused a bit because of the different device runtimes story.

That isn't to say we won't end up with a task group "object" shape for this pattern, but we should consider both options and their tradeoffs :slight_smile:

Either way, it'd be interesting what we can come up with here.

And FYI, Implementation of task group is in TaskGroup.cpp in the Swift repository. Sadly, it is not implemented in Swift and does some tricky things about its status management.

1 Like

Although I'd also like to have better ergonomics for task groups, I think I'd be nice for this "limiting" patterns to live on their own, so they could be freely used without being associated with tasks or task groups.

I will rethink the solution without using TaskGroup, like maybe a custom implementation of AsyncSequence protocol and will post here.

For hardcoding number of parallelism, we can maybe consider handling it internally based on current device hardware design (OperationQueue too takes maxConcurrentOperationsCount from the caller but whether or not the count will be respected depends on the available number of cores in CPU).

As you said, TaskGroup.cpp code is hard to infer and is tricky. Maybe if you can loop in a subject expert on the same, we can discuss ideas to achieve our goal.

That would be me :wink:

First let's discuss API layers and exact semantics, then we can worry about implementation.

For example, I like the idea of a max count that may not be respected but we'll have to see what other folks think about that if it'd be an acceptable idea. Overall we're trying to offer better APIs than existing ones, and not just copy patterns. So we'll have to find out if that API was causing trouble in reality or if it works well enough etc.

1 Like

I wouldn't worry about the cases when the specified max count is larger than the CPU cores, because not specifying the max is even worse than that.

It's another thing that I may want to keep the number of tasks for a given job to be less than the number of cores, i.e. I want to keep some cores free for other stuff. It does come up often in UI apps that some number of e.g. media decoding processes should be executed and I want to keep say one or two cores free for the UI itself.

I don't know if specifying the max relative to the number of cores formally is a good idea, but something like this maybe makes sense?

limitedParallelismTaskGroup(max: .explicit(3)) // up to 3 tasks
limitedParallelismTaskGroup(max: .relative(0.8)) // use up to 80% of hardware resources

P.S. this seems to me like a nice alternative to prioritization without the downsides of prioritization.

2 Likes

One important point is that one wants to have that across task groups really.

I might have ten different task groups that together should use all available core except e g. Two which would be used for ui and more interactive tasks.

This can be handled to some extent with priorities, but if the subtasks don’t yield nicely it still is a problem.

We often check number of available cores and use that minus a couple, but that fails the above scenario.

So some way to have task groups that in aggregate never use more than x is of interest - a bit like could be done with gcd target queues I guess, but the problem is more on a system level here as different libraries should coordinate in this sense then… it’s fundamentally that they shouldn’t be allowed to take the last X available threads from the concurrent pool, across all task groups, then it’d work.

3 Likes

Wouldn't a hierarchy of limited task groups solve the problem? Or maybe your root group is limited while subgroups are not. Though I'm not 100% sure it will work as intended but ideally it should.

The task groups might be started by independent libraries we link with that aren’t aware of each other…

@ktoso This getting relative/explicit number of cores seems to be a good idea!

But this problem already exists right? We can create as many TaskGroups as we want. The internal logic of TaskGroup efficiently uses the cooperative thread pool right? Correct me if I am wrong

Yes, the problem already exists - but this discussion was about improvements to the api surface and I just wanted to highlight a real world problem we have.

3 Likes

As we discussed, accepting an array of async blocks in the API doesn't seem to auger well. Just like how AsyncStream gives a "continuation" object as inout parameter in initializer or how withTaskGroup method gives a TaskGroup instance as inout parameter, we can maybe have the type LimitedParallelismTaskGroup which gives an inout parameter of the group. Each task can be added to the group as required. But the time when the task will be executed will be decided internally based on the maxConcurrentTasks passed by the user.

But no one would want to wait to collect the the results until the entire group has finished executing. So we can make the type conform to AsyncSequence and publish each result as it is received.

So, high level API would be like this:

let taskGroup = LimitedParallelismTaskGroup(maxConcurrentTasks: 2) { group in

for i in 0..<10 {
     group.submitTask {
           return await doSomeLongRunningTask(i)
     }
} 

}

for await result in taskGroup {
     // do something with result
}

Got it :+1:

TaskGroup already does that, so yeah new groups would likely follow this precedent.

1 Like

This may depend a lot on why one wants to limit the number of concurrent threads, but if the goal is to avoid flooding the global thread pool, maybe a solution would be to have an API that lets you configure parallelism relative to the size of the thread pool, instead of as a fixed number?

Maybe even with some defaults that scale well across any N threads, like priorities/QoS:

await withTaskGroup(of: Void.self, maxConcurrentUsage: .low) { ... }

For example .low being ~25% of the pool's total thread count (1 for devices with less than 8 threads, 2 for devices with less than 12...).

And then have an option to specify a fixed number (ie .fixed(6)) as a discouraged option (yet still useful if someone wants to build their own thing).


Personally I'd love to have a way of writing performance-oriented parallelism in Swift without needing to drop to Dispatch. Being able to limit the number of concurrent tasks in a group would be useful, but it's still risky to put any moderately intensive CPU work in a thread group unless you know that each block won't take more than a few hundred milliseconds.

3 Likes

I stumble across the need for this in my stuff fairly frequently, so I absolutely welcome the discussion.

As an additional data point, in basically all my use cases it was distinctly NOT about "local CPU" load management, but some "downstream resource" if you will. I find the default concurrency runtime scheduling will do fine in most cases for CPU. So, imho, this feature is not primarily about "managing the thread pool load".

One illustrative example:
I find myself having to write some sort of data migration script every now and then, where the the basic gist is:

  • stream in a ton of data (from a DB mostly)
  • figure out what to do with it ("local CPU")
  • perform the migration action for each item (typically through some maintenance API on some service or maybe a DB update or similar)

The "local CPU" part (ie: calculating what do do) is essentially negligible compared to the "downstream" operation.

Having a "back-pressured" stream going for one-after-the-other-update is easy, doing "all-at-once" is easy, but setting a "processing width" that is not 1 or "everything, everywhere, all at once" is just so annoying to spell out that I end up not doing it as often as I should.

I understand the hesitation - setting a fixed number for concurrency is a bit crude, especially since there is no easy way to know what right number is. But, in my experience, a back-pressuring "limited width parallel processing" funnel is a great complexity reducer for a lot of cases (latency vs load, QoS-like control, simply "flattening" peaks, ...).

For the migration example above: Sure, you could rigorously figure out whether memory, or bytes on the wire, or parallel connections, or transaction congestions, or whatever else is the bottleneck and fine tune a sophisticated solution - or just pick a number between 5 and 30 and see what happens. Most likely you'll be better off then using either "1" or "all of them" ; )

7 Likes

For me, I would prefer something like AsyncSemaphore.

1 Like