How to parallelize across CPU cores?

Ah, perhaps I did not adequately explain the problem. Every call to findBestGroupStartingWith uses its own independent copy of the entire array of items, which it will mutate as it works. The difference between calls is entirely down to which item it takes as the first in the group.

The work done by each of those calls is essentially independent, except for needing to get the current lowest-cost-yet when it starts running.

As a very rough analogy, you can think of them as “find the highest-scoring scrabble word that starts with A on the current board”, then “find the highest-scoring scrabble word that starts with B on the current board”, etc.

In that analogy, as it tries to string together letters, it might at some point recognize “these starting letters cannot possibly produce a higher score than what we’ve already seen” which lets it prune the search tree.

Of course the real problem is much more complex, but that’s the general idea.

2 Likes

Oh, that’s interesting.

For this specific problem I’m not sure that matters, since I’m basically writing a program that does nothing but crank out this computation.

But I would be interested to learn the recommended approach using GCD.

Sure, I'm suggesting changing findBestGroupStartingWith to not work that way.

With your analogy it would be "find the highest scoring word in the first row of the scrabble board", "find the highest scoring word in the second…", etc..., followed by "then find the highest word of the ones found by the previous step"

Now, with scrabble that wouldn't work, because words can be arranged arbitrarily and cross rows. If your problem is such that each or any comparison may require referencing arbitrary other parts of the array, then yes, this approach won't work.

(Sharing mutable state is simple enough, just use Mutex. But you'll want to minimize your accesses to it, since it'll reduce parallelism any time there's contention on it)

I’m not sure I follow.

In this analogy, the array of items would be the letters on the player’s rack. So by chunking those you’d end up with “Find the highest-scoring word that only uses the letters ABC” then “Find the highest-scoring word that only uses the letters DEF”, etc.

Okay, I’ve never used that before, where can I find documentation and examples?

Works pretty much like you'd expect if you're familiar with mutexes from other languages, aside from the "it encapsulates the state it protects" bit.

1 Like

Thanks!

I’m aware of the concept of a mutex, but I’ve never used one.

I’m asking questions to try to find out what code I should be writing, because I do not know how to make use of the available processing power on a multicore device.

I have a workload that is naturally made up of a large number of nearly-independent tasks that crank through an algorithm as fast as possible. The only interdependency between them is a single variable which each task must read once when it starts, and might need to write once when it ends.

I want this to use all the available processing cores on whatever machine it runs on, and that seems like it should be a fairly standard use-case for concurrency. But I haven’t done anything like this before and I don’t know what code I need to write to make it happen.

So I’m asking for help. What is the recommended way to do this?

1 Like

The group is an AsyncSequence, so I think you can just call max() on it:

let bestResult = await withTaskGroup(…) { group in
  for chunk in chunks {
    await group.addTask { 
      findBestGroupStartingWith(chunk)
    }
  }

  return await group.max()
}

Edit: NVM, neither the original approach nor this one apply, if the "best so far" value is needed as an input to newly started work items.

I'll elaborate on David's point:

There are several reasons why you'd want to avoid this.

First, as your main topic emphasizes, you wouldn't want to have your program have to worry about the specific core count. It would be nice to asbtract that away.

More importantly, matching the chunk count to the core count leads to a performance cliff if the system has any other programs competing for CPU.

For a graphical demonstration, picture this timeline. We have 100 units of work (each =) split into 4 chunk of 25, with an ideal allocation among on 4 cores:

Core 1: [=========================] D
Core 2: [=========================] O
Core 3: [=========================] N
Core 4: [=========================] E

Look what happens if even a single core is busy with something else:

Core 1: [xxxxxxxxxx][=============================] D
Core 2: [=========================][ ... idle ... ] O
Core 3: [=========================][ ... idle ... ] N
Core 4: [=========================][ ... idle ... ] E

Some other program x was using core 1 for some time. Your program gets to use it eventually, but that chunk is behind. The other 3 cores will finish earlier, but your overall time will have to be as slow as the most delayed task's completion. Worse yet, cores 2-4 are idle while they wait fore core 1.

You might consider the other extreme, which is to split 100 work items into 100 single-item chunks. This prevents work-starvation like in the previous examples:

Core 1: [xxxxxxxxxx][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=] D
Core 2: [=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=] O
Core 3: [=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=] N
Core 4: [=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=][=] E

Now all the cores are maximally saturated, and they all finish at the same time, even with the disruption caused by x! You're never stuck waiting on for a straggler to finish while the other cores are all empty, but now there's a different downside: higher overhead. The repeated [ ] brackets are an apt visual metaphor for scheduling and context-switching overhead. The more fine-grained your chunks are, the more overhead you have in proportion to "real work".

So in practice, you'll want to right-size the chunks that's fine-grained enough to schedule work effectively, but not so fine grained that you pay too much overhead. It's a trade off.

I'll also shout out this great point by Jed fox:

9 Likes

That sounds like either a lock-slash-mutex or a task-local value could get you started in the right direction. I guess a semi-important question would be if you need to conform to Swift 6 strict concurrency or not. I think the mutex solution might cause you problems if you want Swift 6… task-local values might help you out there.

The WWDC lectures linked here come with code samples. Have you made any progress through those lectures?

1 Like

I don’t have time to try this now, but swift-atomics look ideal here.

It would preferable over Mutex (or any other kind of non-barrier lock), because the reads don’t need to be in a critical region, only the writes (which only happen if a new best value is found). That way there’s only ever contention for those really fast single-integer writes, which are uncommon to begin with.

4 Likes

I’ve just finished watching the three linked videos:

Swift concurrency: Behind the scenes
Beyond the basics of structured concurrency
Visualize and optimize Swift concurrency

There’s a lot of good information in them, so thank you and @robert.ryan for sharing them.

I’m still rather confused about what I ought to be doing though. Most of the examples from the videos involve tasks that await external things like network or database access, and can thus “take turns” while idle.

My workload is just non-stop computation.

The closest example from those videos seems to be the parallelized file-compressor in “Visualize and optimize Swift concurrency”. For that, they use an actor to encapsulated the shared state, a non-isolated function to avoid staying on the actor, and detached tasks to avoid inheriting the actor context. Plus additional tasks that run on the main actor for progress updates.

I’m not sure how much of that is applicable to my situation, but there are a lot of, uh, parallels: I do have shared mutable state that is only rarely accessed, I do have independent computation-heavy work items, and I would like to provide progress updates (eg. printing to the console).

Is a design like that with actors and detached tasks really the way I should be going? Somehow it seems a bit, well, heavy for what I’m doing.

When you say “single-integer writes”, does that mean atomics will only work if the shared mutable state is a single integer?

The “lowest cost yet” value in my problem is not a single integer, it is actually an array of integers. But perhaps that could still work, because an array has the memory representation of a pointer and thus fits in an integer?

1 Like

Unless you’re finding that a mutex is taking a significant amount of time in your code I’d recommend avoiding atomics. They’re fairly complicated and not very intuitive to use correctly (in comparison with a Mutex which is just mutex.withLock { /* do stuff with $0 */ }). As long as you’re not frequently touching the value, it’s unlikely that you’ll spend much time waiting on the lock since typically zero or one tasks will be trying to read or write to it.

2 Likes

Here's a working example I whipped up. It can run in a standalone file, since it only requires Foundation and Synchronization (for Mutex).

As Jed says, Mutex is probably "good enough", but a a reader-writer lock if contention becomes an issue. It would be nice if there was one built into the Synchorization module, but alas.

import Foundation // For Thread, Dispatch Time, etc.
import Synchronization // For Mutex

/// A dummy input.
struct Item {
	/// An ID for human readability.
	/// For demo purposes, higher IDs are more likely to take longer to process than lower IDs.
	let id: Int
}

/// A dummy result
struct ItemGroup: Sendable {
	let cost: Double

	init(cost: Double) { self.cost = cost }
}

@available(macOS 15.0, *) // For Mutex. We can drop the version if we use a different locking primiative.
final class GroupFinder: Sendable {
	private let bestGroupYetMutex: Mutex<ItemGroup?> = Mutex(nil)

	public func findBestGroup(items: [Item]) async -> ItemGroup? {
		if items.isEmpty { return nil }

		return await withTaskGroup(of: ItemGroup.self) { taskGroup in
			for item in items { // Queue up all the work
				taskGroup.addTask { @Sendable in
					await self.findBestGroup(startingWith: item)
				}
			}

			// Once everything is queued up, start waiting for the results.
			// The cooperative thread pool will run the tasks in parallel, with 1 thread per core.
			await taskGroup.waitForAll()

			return bestGroupYetMutex.withLock { bestGroupYet in bestGroupYet! }
		}
	}

	///
	private func findBestGroup(startingWith item: Item) async -> ItemGroup {
		let lowestCostYet = bestGroupYetMutex.withLock { bestGroupYet in
			// Usually it's a mistake to escape a value from a lock like this, then updating it in a separate lock.
			//
			// In this case it's okay, because a stale `lowestCostYet` value does not ruin the correctness.
			// `lowestCostYet` is just an optimization for culling computations that can't possibly
			// be better that what's already been seen so far. If we load a value and it ends up being stale,
			// we might end culling less and doing more redundant computation, but that's acceptable
			//
			// We take a point-in-time snapshot of this value, and let it escape this block.
			// This lets us do the main computation without hold the mutex the whole time, which would defeat
			// the whole point of trying to parallelize this.
			//
			// A reader-writer lock would be better here, to let multiple reads be concurrent with each other.
			// Only the writes actually need exclusive access.
			// E.g. https://github.com/SomeRandomiOSDev/ReadWriteLock/blob/main/Sources/ReadWriteLock/ReadWriteAtomic.swift
			return bestGroupYet?.cost ?? Double.infinity
		}

		let candidate = await findBestGroup(startingWith: item, lowestCostYet: lowestCostYet)

		// Update our `bestGroupYet` ASAP, so its value can be used by other starting tasks.
		bestGroupYetMutex.withLock { bestGroupYet in
			let lowestCostYet = bestGroupYet?.cost ?? Double.infinity

			if candidate.cost < lowestCostYet {
				print("New best item: \(candidate.cost)")
				bestGroupYet = candidate
			}
		}

		return candidate
	}

	private func findBestGroup(startingWith item: Item, lowestCostYet: Double) async -> ItemGroup {
		// Replace this with the real algorithm.
		let resultCost = Double.random(in: 0...Double(item.id * item.id)) / 1000
		print("Item \(item.id): Started... will take \(resultCost)s.")
		hogCPU(forSeconds: resultCost)
		print("Item \(item.id): done.")
		return ItemGroup(cost: resultCost)
	}

	private func hogCPU(forSeconds seconds: TimeInterval) {
		// Intentionally block the thread using `Thread.sleep` (e.g. instead of `Task.sleep`), to simulate
		// a CPU-intensive computation (as opposed to I/O like a network request that can yield the thread).
		Thread.sleep(forTimeInterval: seconds)

		// Alternatively, we can use a busy-wait and really use the CPU.
//		let deadline = DispatchTime.now() + DispatchTimeInterval.seconds(1)
//		while !Task<Never, Never>.isCancelled && DispatchTime.now() < deadline {}
	}
}


if #available(macOS 15, *) {
	// We base the computation time on the item IDs. For demonstration, we start large and get smaller,
	// so we're more likely to find new lowest costs as computation progresses.
	let dummyItems: Array<Item> = (1..<100).map(Item.init).reversed()
	let result = await GroupFinder().findBestGroup(items: dummyItems)
	print("Best result overall: \(result!.cost)")
}

I think this is a decent starting point, but there's a few improvements you could make:

  1. Integrate with NSProgress, or some other mechanism for progress reporting
  2. Respond to cancellation of the whole task group with try Task.checkCancellation()
  3. If your algorithm allows it, try cancelling mid-way through a computation, if bestGroupYet changes (instead of only checking it once up-front).
2 Likes

I took a stab at applying swift-atomics to my example, but unless I'm missing something, I don't think it's applicable

I was using an atomic int to store the best cost yet. Since there's no swapIfGreater/swapIfLower instruction, so there's no way to implement the the "read, compare, write only if we found a new lower value" semantics in an atomic way. Was there anything I overlooked?

1 Like

I think this will do using atomics (pseudo code):

func updateMax(value: Int) -> Int {
    while true {
        let cur = atomicLoad(&max)
        if value > cur {
            if atomicCompareAndSwap(&max, new: value, old: cur) {
                return value
            }
        } else {
            return cur
        }
    }
}
2 Likes

Ohhh I see, you just keep retrying until the retrieved value was the one you thought it was, at which point you know the swap succeeded. Thanks for showing that.

Yeah, sometimes you want to insert a pause (busy waiting loop) for a pseudo random number of iterations, like 10…1000 but I’m not sure if it actually improves things during contention, never measured myself.

And btw, you could use int64 if that’s the only atomic type you have at your disposal, and just treat (bitcast) it to Double if that’s what you want.

1 Like

Yep, or rather than storing the cost on its own, if ItemGroup was a class, it could inherit from AtomicReference and be used via ManagedAtomic<ItemGroup?>.

Thanks, that’s super helpful!

@available(macOS 15.0, *) // For Mutex. We can drop the version if we use a different locking primiative.

Well I suppose this is as good a reason as any to update my operating system…

// A reader-writer lock would be better here, to let multiple reads be concurrent with each other.
// Only the writes actually need exclusive access.
// E.g. https://github.com/SomeRandomiOSDev/ReadWriteLock/blob/main/Sources/ReadWriteLock/ReadWriteAtomic.swift

The ReadWriteLock in that repo is implemented like this:

public class ReadWriteLock {
  private var lock = pthread_rwlock_t()
  ...
}

I recall a number of discussions on this forum where people explained that this pattern was not actual sound, and to properly use a system lock it was necessary to manually allocate an unsafe pointer for it.

I don’t know for sure if that’s still the case, but I also don’t recall hearing about any changes in that regard. Here’s a thread with a really good explanation and discussion of the issues involved: Exposing the Memory Locations of Class Instance Variables

2 Likes