Understanding Parallelization with Task Group

I'm looking to parallelize a simple for loop using Swift concurrency.

Say I do this (note there are no await calls in the task groups):

let array: [Int] = [/* large int array to process */]
let result = await withTaskGroup(of: Int.self, returning: [Int].self) { group in 
    for idx in stride(from: 0, to: array.count, by: 10) {
        group.addTask {
            return idx * Int.random()
        }
    }
    return await group.reduce(into: [Int](), { $0.append($1) })
}

Can I expect this synchronous code to be parallelized using a task group like this? Or do task groups require their tasks to contain async functions to run in parallel.

3 Likes

I thinkSwift concurrency is a perfect fit for this kind of job. Each task in the group runs concurrently with other tasks in the group if they don't interact with each other. However, there is a limit to the number of tasks that can be added to a task group before overloading the cores, causing a thread explosion.

Edit: The last statement above is incorrect. See @mattie's post below.

The short answer is that you do have async functions for your tasks. The closure parameter of addTask is declared async, and your closure argument is a function, hence it's an async function.

The longer answer is that it's a bit misleading to think that there's "really" anything known as an "async function":

  1. A function declared with the async keyword is a function which may contain internal suspension points.
  2. Of course, a function with internal suspension points must — as an implementation detail — compile a bit differently to a function without any internal suspension points. In that sense (of course) there really are "async functions", but this sense doesn't really produce a lot of insight to anything but the implementation details.
  3. Some functions may have non-internal implicit suspension points when called in particular ways. For example, an isolated synchronous function in an actor still needs to be called with await from outside the actor, because crossing the actor's isolation boundary is necessarily a suspension point, even when the function itself doesn't contain any.
  4. Another example is the function that represents the body of a Task. In concept, a task is created with an implicit suspension point preceding its body, though for all I know this may just be a "ready to run" thing, rather than a true suspension point. But the principle seems the same to me.
  5. In general, invocation of a function always begins synchronously, and continues to run synchronously until it reaches the function's first suspension point. This might be almost immediately if the first suspension point is an implicit one. If it has no suspension points (internal or implicit), then the function will run to completion synchronously, even if called in what you may think of as an asynchronous context.

So, if you're asking whether each of those closures can be expected to run synchronously at the addTask call, then the answer is no. Tasks you create run "later", for some implementation-dependent meaning of "later".

4 Likes

Careful! Swift concurrency does not have a same overcommit behavior that dispatch does. It will never create more threads than its fixed thread pool already has. So, while you have no risk of thread explosion, you do have to worry about work not ever running.

@thecoolwinter you might also want to check out this document in the migration guide. It has some interesting discussion around managing large numbers of work items in groups.

https://www.swift.org/migration/documentation/swift-6-concurrency-migration-guide/runtimebehavior

5 Likes

Thank you.

1 Like

I wish that section had mentioned this explicitly:

1 Like

Technically, yes, you can do this.

A major caveat, though: If your synchronous calculation is so slow as to warrant parallelism, then it might be so slow that we would want to keep it out of the Swift concurrency cooperative thread pool. We have a “runtime contract” with Swift concurrency that threads should always be able to make “forward progress”. We should never block a thread from the cooperative thread pool for any prolonged time.

So, if the calculation is just a stand-in for something substantially more complicated, you should consider keeping these intense calculations out of Swift concurrency entirely, and instead use, for example, GCD’s concurrentPerform, and then bridge back to Swift concurrency with a continuation (e.g., withCheckedContinuation) or other equivalent pattern. And if the calculations are so simple enough that they won’t necessitate this, then you should benchmark your parallel algorithm against a simple serial calculation, and verify if there is any performance benefit from parallelism. Often, with simple algorithms, there is not, and parallel implementations can even be slower than their serial counterparts.


A few observations:

  • We generally stride in order to chunk our work. In your example, if you are striding by 10. That means that each iteration would likely process 10 items each. But, you are only doing one calculation per task. I don’t know if that was your intent or not.

  • There is a slight overhead introduced by parallelization and synchronization. To avoid having this offset any the performance gains you were hoping to achieve, we generally maximize the amount of work on each task. If you do not have enough work on each task, the modest overhead of parallelization can completely wipe out any performance gains from doing the work in parallel. It is not at all unusual to have naive parallelization routines to actually be much slower than their serial counterparts.

    For what it is worth, processing 1 integer (or even 10 of them) per task is unlikely to be nearly enough work per task to enjoy parallelism performance benefits. E.g., if I was processing 1m items, I might consider 100 tasks, each processing 10,000 items, or something of that order of magnitude. And with something as simple as “multiply by a random integer”, even that may be insufficient. (That having been said, I assume your multiplication example was just a proxy for something more complicated.)

  • As an aside, we also stride is to avoid false sharing. So, striding by 10 may be insufficient to achieve that.

  • When performing parallelization, note that tasks can finish in an arbitrary order. Usually when processing an ordered collection, you want the results in the same order. Your code snippet will not do that. If order is important, you’d collate your results to get the results back in the intended order.

So, let us assume for a second that we fix the example so that it does enough synchronous work on each task to justify parallelization. The question (and I think this was your original question), is whether this is suitable to do within Swift concurrency. The answer is, in short, no, it might not be.

The problem is that we have a contract with Swift concurrency that threads should always be able to make forward progress. E.g., Swift concurrency: Behind the scenes says:

Recall that with Swift, the language allows us to uphold a runtime contract that threads will always be able to make forward progress. It is based on this contract that we have built a cooperative thread pool to be the default executor for Swift. As you adopt Swift concurrency, it is important to ensure that you continue to maintain this contract in your code as well so that the cooperative thread pool can function optimally.

In short, to enjoy the benefits of parallel execution, you need enough work on each item to offset the modest overhead of parallelism. But this is at odds with our runtime contract with Swift concurrency (namely, that we will never block a thread from the cooperative thread pool). Technically you can periodically await Task.yield() inside your blocking code (to satisfy this runtime contract), but that will completely overwhelm any performance gains achieved by parallelism.

So, what to do? In Visualize and optimize Swift concurrency, they suggest:


 move that code outside of the concurrency thread pool– for example, by running it on a Dispatch queue– and bridge it to the concurrency world using continuations. Whenever possible, use async APIs for blocking operations to keep the system operating

So, they are recommending that you keep this code within legacy patterns (e.g., concurrencyPerform for your parallelized algorithm) and then bridge it back with a continuation.


There are lots of ways to do this. This is one example where I wrap concurrentPerform in an asynchronous sequence of results:

extension DispatchQueue {
    /// Chunked concurrentPerform
    ///
    /// - Parameters:
    ///
    ///   - iterations: How many total iterations.
    ///
    ///   - chunkCount: How many chunks into which these iterations will be divided. This is optional and will default to
    ///      `activeProcessorCount`. If the work is largely uniform, you can safely omit this parameter and the
    ///      work will evenly distributed amongst the CPU cores.
    ///
    ///      If different chunks are likely to take significantly different amounts of time,
    ///      you may want to increase this value above the processor count to avoid blocking the whole process
    ///      for slowest chunk and afford the opportunity for threads processing faster chunks to handle more than one.
    ///
    ///      But, be careful to not increase this value too high, as each dispatched chunk entails a modest amount of
    ///      overhead. You may want to empirically test different chunk sizes (vs the default value) for your particular
    ///      use-case.
    ///
    ///   - qos: The `DispatchQoS` of the work to be performed. Defaults to `.utility` which offers good performance,
    ///      while minimizing preemption of code on higher priority queues, such as the main queue.
    ///
    ///   - operation: Closure to be called for each chunk. The `Range<Index>` is parameter.
    ///
    /// - Returns: An asynchronous sequence of the processed chunks.

    static func concurrentPerformResults<T: Sendable>(
        iterations: Int,
        chunkCount: Int? = nil,
        qos: DispatchQoS.QoSClass = .utility,
        operation: @Sendable @escaping (Range<Int>) -> T
    ) -> AsyncStream<(range: Range<Int>, values: T)> {
        AsyncStream { continuation in
            DispatchQueue.global(qos: qos).async {
                let chunks = Swift.min(iterations, chunkCount ?? ProcessInfo.processInfo.activeProcessorCount)
                let (quotient, remainder) = iterations.quotientAndRemainder(dividingBy: chunks)
                let chunkSize = remainder == 0 ? quotient : quotient + 1

                DispatchQueue.concurrentPerform(iterations: chunks) { chunkIndex in
                    let start = chunkIndex * chunkSize
                    let end = min(start + chunkSize, iterations)
                    let range = start ..< end

                    continuation.yield((range, operation(range)))
                }

                continuation.finish()
            }
        }
    }
}

And then I can do things like:

actor Experiment {
    func performExperiment() async {
        let iterations = 1_000_000
        let values = Array(0..<iterations)
        let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations) { range in
            values[range].map { $0 * 2 }
        }

        var results = Array(repeating: 0, count: iterations)
        for await chunk in sequence {
            results.replaceSubrange(chunk.range, with: chunk.values)
        }
        print(results)
    }
}

And when I profile this:

Note that these finish in a fairly random order (which is why I built the array and used replaceSubrange, to reassemble the results in the correct order).

4 Likes

Yeah that's a good point. The guide should include some details on which APIs/behaviors are unsafe.

This is not what blocking a thread means. The rule is that you must either be doing on-CPU work (like arithmetic here). “Blocking” is when you’re occupying one of the threads in the thread pool but not actively using it to do work on the CPU, such as when you’re waiting on a semaphore or mutex. You’re expected to have chunks of time on the concurrent thread pool where you are doing work.

As you discussed, it is indeed important to size that work so that the inherent overhead of the concurrency runtime is relatively small in comparison to useful work, while keeping the tasks small enough that the system can distribute work effectively. For example, on Apple Silicon devices, the efficiency cores run slower than the performance cores and as a result can complete fewer batches of work. So it is beneficial to have substantially more batches than there are CPU cores to ensure all cores finish up their work at about the same time.

it is typically ok if you occasionally block on other work that is currently happening via a mutex because the system can understand the dependencies and boost the priority of the task that is currently holding the lock. But of course any time spent in lock contention is time not spent doing meaningful work.

8 Likes

The challenge with running highly parallelized, but synchronous, routines in Swift concurrency is that once you exhaust the cooperative thread pool (which is limited to the processor count on your device) these can prevent child tasks from other subsystems from even starting until the cooperative thread pool is freed. I have confirmed these sorts of issues empirically.

The async-await proposal, SE-0296, is fairly specific in regards to long, synchronous calculations, pointing out that the “blocking” considerations apply both when calling some synchronous function as well as any “computationally intensive loop”:

So, you can either move these computationally intensive loops out of the Swift concurrency context, or periodically yield to the Swift concurrency system.

Now, all of this depends highly upon what the OP’s computationally intense algorithm is really doing. If it really was just multiplying a million integers as the code snippet suggested (which took only a few milliseconds on my device with an optimized release build) this may well just not be enough work to raise these sorts of concerns (much less, warrant parallelization). But I worried that this was just an overly simplified example for the purposes of this question; if it was significantly more complicated calculation, then the caveat of SE-0296 may be warranted. It is hard to say on the basis of the information provided. But my spidey senses just tingled when I heard the OP use “synchronous” and “parallelized” in the same sentence, as in extreme cases, caution is advised.

3 Likes

For context, the concurrent workload I’m most experienced with has minimal lock contention and is entirely CPU bound. It can take upwards of 30 seconds and works just fine using a task group and not concurrentPerform since it is broken up into reasonably small chunks. But if the individual chunks took more than a hundred milliseconds or so I would definitely make them smaller!

2 Likes

Wow, thank you everyone who took the time to respond to this. I should clarify the initial problem. The example I gave is a trivial example, but in the real problem I was looking to understand better the 'data' array is an array of structures that have a long-running CPU-bound calculation to do. I was also working on the assumption that the data and calculation has the 'golden ticket' of parallelization (calculation is commutative, result order doesn't matter, all data are immutable). That being said a lot of the information provided by everyone has helped my understanding of the Swift concurrency system so thank you.

What I think I understand now is:

  • All functions passed to a task group will be run in 'parallel' in the thread pool if possible. However, it's important to keep in mind the promise to the Swift concurrency thread pool that we'll always be able to make forward progress.
  • To resolve that problem, it's usually best to move the work outside of the thread pool (eg: using GCD). Or by adding yields in the synchronous task code so that Swift has a chance to give other work time on the CPU.
  • Thread explosion isn't possible, but memory explosion is with over-initializing Tasks in a group. So if using a task group the programmer needs to throttle the amount of tasks created.
2 Likes

@robert.ryan
Hi Robert,

I found this thread while researching parallel processing in Swift.
Thank you for your explanation!
However, I do not observe expected time difference while utilizing more core. In fact, don’t see any speed up.

I ran your performExperiment() in a timed loop with chunkCount from 1 to 14, with no measurable time difference:

activeProcessorCount: 14
Processed in 1 chunks: 17.982080541 seconds
Processed in 2 chunks: 16.349675958000002 seconds
Processed in 3 chunks: 15.870773375 seconds
Processed in 4 chunks: 15.691222457999999 seconds
Processed in 5 chunks: 15.562732959 seconds
Processed in 6 chunks: 15.465004083 seconds
Processed in 7 chunks: 15.5559345 seconds
Processed in 8 chunks: 15.588203292 seconds
Processed in 9 chunks: 15.652383209 seconds
Processed in 10 chunks: 15.677385541 seconds
Processed in 11 chunks: 15.636714416 seconds
Processed in 12 chunks: 15.526148833999999 seconds
Processed in 13 chunks: 15.519439375000001 seconds
Processed in 14 chunks: 15.482482916 seconds

Here is how I timed your code:

actor Experiment {
    func performExperiment() async {
        let clock = ContinuousClock()
        let procCount = ProcessInfo.processInfo.activeProcessorCount
        print("activeProcessorCount: \(procCount)")
        for chunkCount in 1... procCount {
            let start = clock.now
            
            let iterations = 100_000_000
            let values = Array(0..<iterations)
            let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations, chunkCount: chunkCount) { range in
                values[range].map { $0 * 2 }
            }
            
            var results = Array(repeating: 0, count: iterations)
            for await chunk in sequence {
                results.replaceSubrange(chunk.range, with: chunk.values)
            }
            print(results.count)
            print("Processed in \(chunkCount) chunks: \(clock.now - start)")
        }
    }
}

Note that I am printing just the count at the end; your print(result) froze my system :)

What am I doing wrong? Or I have wrong expectations?
Thank you!

Vlad

1 Like

@VladFein – The problem is that there is not enough work being performed on each thread to significantly offset the (admittedly modest) overhead of parallelization. (Sometimes a simple serial rendition will be faster!)

You will see significant benefits from parallelism where there is some computationally intensive process. E.g., let us consider a contrived example, where I am going to a far more complicated calculation (a deliberately inefficient calculation of pi) for each element in the array:

func performExperiment() async {
    let clock = ContinuousClock()
    let processorCount = ProcessInfo.processInfo.activeProcessorCount
    print("activeProcessorCount: \(processorCount)")
    for chunkCount in 1 ... processorCount * 2 {
        let start = clock.now

        let iterations = 1_000
        let values = Array(0..<iterations)
        let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations, chunkCount: chunkCount) { range in
            values[range].map { Double($0) * self.calculatePi(iterations: 1_000_000) }
        }

        var results = Array(repeating: 0.0, count: iterations)
        for await chunk in sequence {
            results.replaceSubrange(chunk.range, with: chunk.values)
        }

        let elapsed = start.duration(to: .now)
        print("Processed in \(chunkCount) chunks: \(elapsed)")
    }
}

nonisolated func calculatePi(iterations: Int) -> Double {
    var result = 0.0
    var sign = 1.0
    for i in 0 ..< iterations {
        result += sign / Double(i * 2 + 1)
        sign *= -1
    }
    return result * 4
}

extension DispatchQueue {
    /// Chunked concurrentPerform
    ///
    /// - Parameters:
    ///
    ///   - iterations: How many total iterations.
    ///
    ///   - chunkCount: How many chunks into which these iterations will be divided. This is optional and will default to
    ///      `activeProcessorCount`. If the work is largely uniform, you can safely omit this parameter and the
    ///      work will evenly distributed amongst the CPU cores.
    ///
    ///      If different chunks are likely to take significantly different amounts of time,
    ///      you may want to increase this value above the processor count to avoid blocking the whole process
    ///      for slowest chunk and afford the opportunity for threads processing faster chunks to handle more than one.
    ///
    ///      But, be careful to not increase this value too high, as each dispatched chunk entails a modest amount of
    ///      overhead. You may want to empirically test different chunk sizes (vs the default value) for your particular
    ///      use-case.
    ///
    ///   - qos: The `DispatchQoS` of the work to be performed. Defaults to `.utility` which offers good performance,
    ///      while minimizing preemption of code on higher priority queues, such as the main queue.
    ///
    ///   - operation: Closure to be called for each chunk. The `Range<Index>` is parameter.
    ///
    /// - Returns: An asynchronous sequence of the processed chunks.

    static func concurrentPerformResults<T: Sendable>(
        iterations: Int,
        chunkCount: Int? = nil,
        qos: DispatchQoS.QoSClass = .utility,
        operation: @Sendable @escaping (Range<Int>) -> T
    ) -> AsyncStream<(range: Range<Int>, values: T)> {
        AsyncStream { continuation in
            DispatchQueue.global(qos: qos).async {
                let chunks = Swift.min(iterations, chunkCount ?? ProcessInfo.processInfo.activeProcessorCount)
                let (baseChunkSize, remainder) = iterations.quotientAndRemainder(dividingBy: chunks)

                DispatchQueue.concurrentPerform(iterations: chunks) { chunkIndex in
                    let start = chunkIndex * baseChunkSize + min(chunkIndex, remainder)
                    let end = start + baseChunkSize + (chunkIndex < remainder ? 1 : 0)
                    let range = start ..< end

                    continuation.yield((range, operation(range)))
                }

                continuation.finish()
            }
        }
    }
}

Then, you start to see some significant performance gains achieved through parallelism:

activeProcessorCount: 20
Processed in 1 chunks: 0.978234333 seconds
Processed in 2 chunks: 0.491440917 seconds
Processed in 3 chunks: 0.335044417 seconds
Processed in 4 chunks: 0.250512834 seconds
Processed in 5 chunks: 0.200163291 seconds
Processed in 6 chunks: 0.167424667 seconds
Processed in 7 chunks: 0.143622000 seconds
Processed in 8 chunks: 0.124962458 seconds
Processed in 9 chunks: 0.112275750 seconds
Processed in 10 chunks: 0.102217708 seconds
Processed in 11 chunks: 0.092624750 seconds
Processed in 12 chunks: 0.087693916 seconds
Processed in 13 chunks: 0.086108000 seconds
Processed in 14 chunks: 0.078064375 seconds
Processed in 15 chunks: 0.068986333 seconds
Processed in 16 chunks: 0.067172459 seconds
Processed in 17 chunks: 0.069006000 seconds
Processed in 18 chunks: 0.069018917 seconds
Processed in 19 chunks: 0.069689542 seconds
Processed in 20 chunks: 0.064455000 seconds


Processed in 40 chunks: 0.058316708 seconds

The bottom line is that you need enough work on each thread to achieve material performance benefits from parallelism.


As an aside, please note a fix in the calculation of the start and end of the strides in concurrentPerformResults.

4 Likes

@robert.ryan Thank you VERY much for your help!
I was able to see similar scaling on my mini m4, but I had to drop self.calculatePi(iterations: 10_000) to 10K (vs your 1M) to get ~1 second duration. What monster are you running on? :slight_smile:

If I may, I have a follow up question. My Xcode build for iPad runs 1 chunk in 1.1 sec in simulator. When I move the same exact code into a Mac CLI project, it takes 1.4 sec, and 10 chunks take more than twice compared to simulator.
Could you please explain this?

@robert.ryan Hi Robert, I still can't make sense of the performance scaling...

I'm trying to calculate the number of prime numbers in a range. Using naive implementation, just to spend some time there:

    nonisolated func countPrimes(range: Range<Int>) -> Int {
        let start = clock.now
        var count = 0
        for n in range {
            if isPrime(n) { count += 1 }
        }
        let elapsed = start.duration(to: .now)
        print("countPrimes(\(range)): \(count) in \(elapsed)")
        return count
    }
    
    nonisolated func isPrime(_ n: Int) -> Bool {
        if n < 2 { return false }
        let squareRoot = Int(sqrt(Double(n)))
        for divisor in stride(from: 2, through: squareRoot, by: 1) {
            if n % divisor == 0 {
                return false
            }
        }
        return true
    }

Here are your modified functions:

    func performExperiment() async {
        let clock = ContinuousClock()
        let processorCount = ProcessInfo.processInfo.activeProcessorCount
        print("activeProcessorCount: \(processorCount)")
        for chunkCount in [1, 2, 5, 10] {
            let start = clock.now
            
            let iterations = 2_000_000
            var values = [Int]() //Array(0..<iterations)
            let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations, chunkCount: chunkCount) { range in
                values.append(self.countPrimes(range: range))
            }
            
            for await _ in sequence {
            }
            
            let elapsed = start.duration(to: .now)
            print("Processed in \(chunkCount) chunks [\(values.reduce(0, +))]: \(elapsed)")
        }
    }
    static func concurrentPerformResults<T: Sendable>(
        iterations: Int,
        chunkCount: Int? = nil,
        qos: DispatchQoS.QoSClass = .utility,
        operation: @Sendable @escaping (Range<Int>) -> T
    ) -> AsyncStream<(ind: Int, values: T)> {
        AsyncStream { continuation in
            DispatchQueue.global(qos: qos).async {
                let chunks = Swift.min(iterations, chunkCount ?? ProcessInfo.processInfo.activeProcessorCount)
                let (baseChunkSize, remainder) = iterations.quotientAndRemainder(dividingBy: chunks)

                DispatchQueue.concurrentPerform(iterations: chunks) { chunkIndex in
                    let start = chunkIndex * baseChunkSize + min(chunkIndex, remainder)
                    let end = start + baseChunkSize + (chunkIndex < remainder ? 1 : 0)
                    let range = start ..< end

                    continuation.yield((chunkIndex, operation(range)))
                }

                continuation.finish()
            }
        }
    }

and here are the results:

activeProcessorCount: 14
countPrimes(0..<2000000): 148933 in 3.548041667 seconds
Processed in 1 chunks [148933]: 3.548563584 seconds

countPrimes(0..<1000000): 78498 in 1.550708708 seconds
countPrimes(1000000..<2000000): 70435 in 2.322831709 seconds
Processed in 2 chunks [148933]: 2.322942292 seconds

countPrimes(0..<400000): 33860 in 0.839972208 seconds
countPrimes(400000..<800000): 30091 in 1.166854292 seconds
countPrimes(800000..<1200000): 28987 in 1.331159542 seconds
countPrimes(1200000..<1600000): 28189 in 1.433465167 seconds
countPrimes(1600000..<2000000): 27806 in 1.518825834 seconds
Processed in 5 chunks [148933]: 1.519066792 seconds

countPrimes(0..<200000): 17984 in 0.787754709 seconds
countPrimes(200000..<400000): 15876 in 1.062655708 seconds
countPrimes(400000..<600000): 15238 in 1.231948417 seconds
countPrimes(600000..<800000): 14853 in 1.344216125 seconds
countPrimes(800000..<1000000): 14547 in 1.4335795 seconds
countPrimes(1000000..<1200000): 14440 in 1.476507542 seconds
countPrimes(1200000..<1400000): 14188 in 1.512319625 seconds
countPrimes(1400000..<1600000): 14001 in 1.545897709 seconds
countPrimes(1600000..<1800000): 13945 in 1.558909292 seconds
countPrimes(1800000..<2000000): 13861 in 1.582667084 seconds
Processed in 10 chunks [148933]: 1.584449042 seconds

As you can see, 1 to 2 scales OK, to 5 - not so much, but 10 gives no benefit over 5.
What am I missing?
With 5 chunks I process 400,000 in 0.83 sec.
With 10, in 0.79 + 1.06 = 1.85 sec. More than double!
Is it still not enough load???

Thank you for looking at this!

@VladFein – I’m running on a 2022 M1 Mac Studio.

I wonder if you are using a “debug” build. I used an optimized/release build (e.g., edit scheme, ⌘-<, and make sure to use a “Run” » “Build configuration” » “Release”).

1 Like

A few things:

  1. Probably unrelated, but your performExperiment with isPrime is not thread-safe. You should not append to an array from inside concurrentPerform (as multiple threads are updating the same array). I might recommend reducing the sequence directly:

    func performExperimentPrimes() async {
        let clock = ContinuousClock()
        let processorCount = ProcessInfo.processInfo.activeProcessorCount
        print("activeProcessorCount: \(processorCount)")
        for chunkCount in [1, 2, 5, 10, 20, 40, 80] {
            let start = clock.now
    
            let iterations = 2_000_000
    
            let sequence = DispatchQueue.concurrentPerformResults(iterations: iterations, chunkCount: chunkCount) { range in
                self.countPrimes(range: range)
            }
    
            let count = await sequence.reduce(0) { $0 + $1.values }
    
            let elapsed = start.duration(to: .now)
            print("Processed in \(chunkCount) chunks [\(count)]: \(elapsed)")
        }
    }
    
  2. I tested in on my M1 Mac Studio and these were my results:

    activeProcessorCount: 20
    Processed in 1 chunks [148933]: 0.148517125 seconds
    Processed in 2 chunks [148933]: 0.078614666 seconds
    Processed in 5 chunks [148933]: 0.034502667 seconds
    Processed in 10 chunks [148933]: 0.017716916 seconds
    Processed in 20 chunks [148933]: 0.011895292 seconds
    Processed in 40 chunks [148933]: 0.010299875 seconds
    Processed in 80 chunks [148933]: 0.008900709 seconds
    

    Again, use optimized, “release” builds. My Mac might have more processors than yours, but your processors are, individually, much faster than mine. You are clearly using “debug” builds.

  3. As an aside, the isPrime algorithm is problematic because it takes longer for larger numbers (thus the later ranges will always take longer than earlier ones). Ideally, parallelization should attempt to evenly distribute the work across the threads, but isPrime will take longer for later intervals.

    This is why when I increased the number of chunks above my processor count, I still continued to enjoy performance improvements, because the work load was not evenly distributed. This trick, though is at a negligible cost of increased overhead.

    But this uneven distribution of work will slightly diminish the performance gains that parallelism can achieve.

  4. Note that not all cores on your M4 Mac Mini are created equal. It has a mix of performance cores and efficiency cores. So, if the number of chunks exceeds the number of performance cores, you will likely have chunks running on these efficiency cores and the entire process will not finish until the slowest one (likely doing the most complex work; see point 3) finishes, while other processors sit idle.

    As I suggested above, I might suggest increasing the chunk count: I find that frequently 2-4× the number of active processors to be a good starting point. You should experiment with other values, as the performance characteristics can change based upon the precise calculations being performed. But try a number of chunks that exceeds the number of active processors.

    The goal is to experimentally identify the sweet spot for the granularity of the chunks. If the chunks are too large (e.g., resulting in a number of active chunks equaling, or less than, the number of active processors on the device), you will find that some cores will finish early and overall the process will have to wait for the slowest core while other processors sit idle. However, if the chunks are too small (e.g., thousands of chunks or more), the overhead of parallelism will eventually dominate, adversely affecting the overall performance. You just have to try different values.

2 Likes

@robert.ryan

  1. Although not related to my problem at hand, I greatly appreciate your point, will use it in my real project.
  2. I was in fact using debug build. That IS embarrassing as for decades I told everyone not to time debug builds :( In my defense, I thought that the time penalty would be proportional. Clearly, I was wrong.
  3. 100% agree. I knew that going in, just chose a simple example that I could understand. Also, besides looking at the total processing time, I summed up the times spent in each thread, to get "core hours" per task. That clearly shows some overhead, but the performance gain is obvious.
    I got overhead calculated as:
    1% for 2 threads
    11% for 5
    15% for 10
    Not sure what would be reasonable to expect, but I will take it.

I am very grateful for you sharing your time and expertise!

3 Likes