Understanding Parallelization with Task Group

I'm looking to parallelize a simple for loop using Swift concurrency.

Say I do this (note there are no await calls in the task groups):

let array: [Int] = [/* large int array to process */]
let result = await withTaskGroup(of: Int.self, returning: [Int].self) { group in 
    for idx in stride(from: 0, to: array.count, by: 10) {
        group.addTask {
            return idx * Int.random()
        }
    }
    return await group.reduce(into: [Int](), { $0.append($1) })
}

Can I expect this synchronous code to be parallelized using a task group like this? Or do task groups require their tasks to contain async functions to run in parallel.

3 Likes

I thinkSwift concurrency is a perfect fit for this kind of job. Each task in the group runs concurrently with other tasks in the group if they don't interact with each other. However, there is a limit to the number of tasks that can be added to a task group before overloading the cores, causing a thread explosion.

Edit: The last statement above is incorrect. See @mattie's post below.

The short answer is that you do have async functions for your tasks. The closure parameter of addTask is declared async, and your closure argument is a function, hence it's an async function.

The longer answer is that it's a bit misleading to think that there's "really" anything known as an "async function":

  1. A function declared with the async keyword is a function which may contain internal suspension points.
  2. Of course, a function with internal suspension points must — as an implementation detail — compile a bit differently to a function without any internal suspension points. In that sense (of course) there really are "async functions", but this sense doesn't really produce a lot of insight to anything but the implementation details.
  3. Some functions may have non-internal implicit suspension points when called in particular ways. For example, an isolated synchronous function in an actor still needs to be called with await from outside the actor, because crossing the actor's isolation boundary is necessarily a suspension point, even when the function itself doesn't contain any.
  4. Another example is the function that represents the body of a Task. In concept, a task is created with an implicit suspension point preceding its body, though for all I know this may just be a "ready to run" thing, rather than a true suspension point. But the principle seems the same to me.
  5. In general, invocation of a function always begins synchronously, and continues to run synchronously until it reaches the function's first suspension point. This might be almost immediately if the first suspension point is an implicit one. If it has no suspension points (internal or implicit), then the function will run to completion synchronously, even if called in what you may think of as an asynchronous context.

So, if you're asking whether each of those closures can be expected to run synchronously at the addTask call, then the answer is no. Tasks you create run "later", for some implementation-dependent meaning of "later".

3 Likes

Careful! Swift concurrency does not have a same overcommit behavior that dispatch does. It will never create more threads than its fixed thread pool already has. So, while you have no risk of thread explosion, you do have to worry about work not ever running.

@thecoolwinter you might also want to check out this document in the migration guide. It has some interesting discussion around managing large numbers of work items in groups.

https://www.swift.org/migration/documentation/swift-6-concurrency-migration-guide/runtimebehavior

3 Likes

Thank you.

1 Like

I wish that section had mentioned this explicitly:

1 Like

Technically, yes, you can do this.

A major caveat, though: If your synchronous calculation is so slow as to warrant parallelism, then it might be so slow that we would want to keep it out of the Swift concurrency cooperative thread pool. We have a “runtime contract” with Swift concurrency that threads should always be able to make “forward progress”. We should never block a thread from the cooperative thread pool for any prolonged time.

So, if the calculation is just a stand-in for something substantially more complicated, you should consider keeping these intense calculations out of Swift concurrency entirely, and instead use, for example, GCD’s concurrentPerform, and then bridge back to Swift concurrency with a continuation (e.g., withCheckedContinuation) or other equivalent pattern. And if the calculations are so simple enough that they won’t necessitate this, then you should benchmark your parallel algorithm against a simple serial calculation, and verify if there is any performance benefit from parallelism. Often, with simple algorithms, there is not, and parallel implementations can even be slower than their serial counterparts.


A few observations:

  • We generally stride in order to chunk our work. In your example, if you are striding by 10. That means that each iteration would likely process 10 items each. But, you are only doing one calculation per task. I don’t know if that was your intent or not.

  • There is a slight overhead introduced by parallelization and synchronization. To avoid having this offset any the performance gains you were hoping to achieve, we generally maximize the amount of work on each task. If you do not have enough work on each task, the modest overhead of parallelization can completely wipe out any performance gains from doing the work in parallel. It is not at all unusual to have naive parallelization routines to actually be much slower than their serial counterparts.

    For what it is worth, processing 1 integer (or even 10 of them) per task is unlikely to be nearly enough work per task to enjoy parallelism performance benefits. E.g., if I was processing 1m items, I might consider 100 tasks, each processing 10,000 items, or something of that order of magnitude. And with something as simple as “multiply by a random integer”, even that may be insufficient. (That having been said, I assume your multiplication example was just a proxy for something more complicated.)

  • As an aside, we also stride is to avoid false sharing. So, striding by 10 may be insufficient to achieve that.

  • When performing parallelization, note that tasks can finish in an arbitrary order. Usually when processing an ordered collection, you want the results in the same order. Your code snippet will not do that. If order is important, you’d collate your results to get the results back in the intended order.

So, let us assume for a second that we fix the example so that it does enough synchronous work on each task to justify parallelization. The question (and I think this was your original question), is whether this is suitable to do within Swift concurrency. The answer is, in short, no, it might not be.

The problem is that we have a contract with Swift concurrency that threads should always be able to make forward progress. E.g., Swift concurrency: Behind the scenes says:

Recall that with Swift, the language allows us to uphold a runtime contract that threads will always be able to make forward progress. It is based on this contract that we have built a cooperative thread pool to be the default executor for Swift. As you adopt Swift concurrency, it is important to ensure that you continue to maintain this contract in your code as well so that the cooperative thread pool can function optimally.

In short, to enjoy the benefits of parallel execution, you need enough work on each item to offset the modest overhead of parallelism. But this is at odds with our runtime contract with Swift concurrency (namely, that we will never block a thread from the cooperative thread pool). Technically you can periodically await Task.yield() inside your blocking code (to satisfy this runtime contract), but that will completely overwhelm any performance gains achieved by parallelism.

So, what to do? In Visualize and optimize Swift concurrency, they suggest:

… move that code outside of the concurrency thread pool– for example, by running it on a Dispatch queue– and bridge it to the concurrency world using continuations. Whenever possible, use async APIs for blocking operations to keep the system operating

So, they are recommending that you keep this code within legacy patterns (e.g., concurrencyPerform for your parallelized algorithm) and then bridge it back with a continuation.


There are lots of ways to do this. This is one example where I wrap concurrentPerform in an asynchronous sequence of results:

extension DispatchQueue {
    /// Chunked concurrentPerform
    ///
    /// - Parameters:
    ///
    ///   - iterations: How many total iterations.
    ///
    ///   - chunkCount: How many chunks into which these iterations will be divided. This is optional and will default to
    ///      `activeProcessorCount`. If the work is largely uniform, you can safely omit this parameter and the
    ///      work will evenly distributed amongst the CPU cores.
    ///
    ///      If different chunks are likely to take significantly different amounts of time,
    ///      you may want to increase this value above the processor count to avoid blocking the whole process
    ///      for slowest chunk and afford the opportunity for threads processing faster chunks to handle more than one.
    ///
    ///      But, be careful to not increase this value too high, as each dispatched chunk entails a modest amount of
    ///      overhead. You may want to empirically test different chunk sizes (vs the default value) for your particular
    ///      use-case.
    ///
    ///   - qos: The `DispatchQoS` of the work to be performed. Defaults to `.utility` which offers good performance,
    ///      while minimizing preemption of code on higher priority queues, such as the main queue.
    ///
    ///   - operation: Closure to be called for each chunk. The `Range<Index>` is parameter.
    ///
    /// - Returns: An asynchronous sequence of the processed chunks.

    static func concurrentPerformResults<T: Sendable>(
        iterations: Int,
        chunkCount: Int? = nil,
        qos: DispatchQoS.QoSClass = .utility,
        operation: @Sendable @escaping (Range<Int>) -> T
    ) -> AsyncStream<(range: Range<Int>, values: T)> {
        AsyncStream { continuation in
            DispatchQueue.global(qos: qos).async {
                let chunks = Swift.min(iterations, chunkCount ?? ProcessInfo.processInfo.activeProcessorCount)
                let (quotient, remainder) = iterations.quotientAndRemainder(dividingBy: chunks)
                let chunkSize = remainder == 0 ? quotient : quotient + 1

                DispatchQueue.concurrentPerform(iterations: chunks) { chunkIndex in
                    let start = chunkIndex * chunkSize
                    let end = min(start + chunkSize, iterations)
                    let range = start ..< end

                    continuation.yield((range, operation(range)))
                }

                continuation.finish()
            }
        }
    }
}

And then I can do things like:

actor Experiment {
    func performExperiment() async {
        let iterations = 1_000_000
        let values = Array(0..<iterations)
        let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations) { range in
            values[range].map { $0 * 2 }
        }

        var results = Array(repeating: 0, count: iterations)
        for await chunk in sequence {
            results.replaceSubrange(chunk.range, with: chunk.values)
        }
        print(results)
    }
}

And when I profile this:

Note that these finish in a fairly random order (which is why I built the array and used replaceSubrange, to reassemble the results in the correct order).

2 Likes

Yeah that's a good point. The guide should include some details on which APIs/behaviors are unsafe.

This is not what blocking a thread means. The rule is that you must either be doing on-CPU work (like arithmetic here). “Blocking” is when you’re occupying one of the threads in the thread pool but not actively using it to do work on the CPU, such as when you’re waiting on a semaphore or mutex. You’re expected to have chunks of time on the concurrent thread pool where you are doing work.

As you discussed, it is indeed important to size that work so that the inherent overhead of the concurrency runtime is relatively small in comparison to useful work, while keeping the tasks small enough that the system can distribute work effectively. For example, on Apple Silicon devices, the efficiency cores run slower than the performance cores and as a result can complete fewer batches of work. So it is beneficial to have substantially more batches than there are CPU cores to ensure all cores finish up their work at about the same time.

it is typically ok if you occasionally block on other work that is currently happening via a mutex because the system can understand the dependencies and boost the priority of the task that is currently holding the lock. But of course any time spent in lock contention is time not spent doing meaningful work.

6 Likes

The challenge with running highly parallelized, but synchronous, routines in Swift concurrency is that once you exhaust the cooperative thread pool (which is limited to the processor count on your device) these can prevent child tasks from other subsystems from even starting until the cooperative thread pool is freed. I have confirmed these sorts of issues empirically.

The async-await proposal, SE-0296, is fairly specific in regards to long, synchronous calculations, pointing out that the “blocking” considerations apply both when calling some synchronous function as well as any “computationally intensive loop”:

So, you can either move these computationally intensive loops out of the Swift concurrency context, or periodically yield to the Swift concurrency system.

Now, all of this depends highly upon what the OP’s computationally intense algorithm is really doing. If it really was just multiplying a million integers as the code snippet suggested (which took only a few milliseconds on my device with an optimized release build) this may well just not be enough work to raise these sorts of concerns (much less, warrant parallelization). But I worried that this was just an overly simplified example for the purposes of this question; if it was significantly more complicated calculation, then the caveat of SE-0296 may be warranted. It is hard to say on the basis of the information provided. But my spidey senses just tingled when I heard the OP use “synchronous” and “parallelized” in the same sentence, as in extreme cases, caution is advised.

3 Likes

For context, the concurrent workload I’m most experienced with has minimal lock contention and is entirely CPU bound. It can take upwards of 30 seconds and works just fine using a task group and not concurrentPerform since it is broken up into reasonably small chunks. But if the individual chunks took more than a hundred milliseconds or so I would definitely make them smaller!

1 Like

Wow, thank you everyone who took the time to respond to this. I should clarify the initial problem. The example I gave is a trivial example, but in the real problem I was looking to understand better the 'data' array is an array of structures that have a long-running CPU-bound calculation to do. I was also working on the assumption that the data and calculation has the 'golden ticket' of parallelization (calculation is commutative, result order doesn't matter, all data are immutable). That being said a lot of the information provided by everyone has helped my understanding of the Swift concurrency system so thank you.

What I think I understand now is:

  • All functions passed to a task group will be run in 'parallel' in the thread pool if possible. However, it's important to keep in mind the promise to the Swift concurrency thread pool that we'll always be able to make forward progress.
  • To resolve that problem, it's usually best to move the work outside of the thread pool (eg: using GCD). Or by adding yields in the synchronous task code so that Swift has a chance to give other work time on the CPU.
  • Thread explosion isn't possible, but memory explosion is with over-initializing Tasks in a group. So if using a task group the programmer needs to throttle the amount of tasks created.
2 Likes