How to parallelize across CPU cores?

Instead of a busy waiting loop, there are pause instructions typically for this purposes that are suitable for the purpose, e.g for x86:

(random quick googled reference, assuming similar exists for ARM)

1 Like

Not worth raising the version just for Mutex, IMO.

I was able to make this work with an ManagedAtomic based on @tera's suggestion, which drops the requirement down to macOS 10.15 (for withTaskGroup(...).

Demo using ManagedAtomic
import Foundation // For Thread, Dispatch Time, etc.
import Atomics

/// A dummy input.
struct Item {
	/// An ID for human readability.
	/// For demo purposes, higher IDs are more likely to take longer to process than lower IDs.
	let id: Int
}

/// A dummy result
final class ItemGroup: Sendable, AtomicReference {
	let cost: Double

	init(cost: Double) { self.cost = cost }
}

@available(macOS 10.15, *)
final class GroupFinder: Sendable {
	private let bestGroupYetAtomic: ManagedAtomic<ItemGroup?> = .init(nil)

	public func findBestGroup(items: [Item]) async -> ItemGroup? {
		if items.isEmpty { return nil }

		return await withTaskGroup(of: ItemGroup.self) { taskGroup in
			for item in items { // Queue up all the work
				taskGroup.addTask { @Sendable in
					await self.findBestGroup(startingWith: item)
				}
			}

			// Once everything is queued up, start waiting for the results.
			// The cooperative thread pool will run the tasks in parallel, with 1 thread per core.
			await taskGroup.waitForAll()

			return bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)
		}
	}

	private func findBestGroup(startingWith item: Item) async -> ItemGroup {
		// The `lowestCostYet` is just an optimization for culling computations that can't possibly
		// be better that what's already been seen so far. If we load a value and it ends up being stale,
		// we might end culling less and doing more redundant computation. So it's a can trade-off between:
		// 1. `.sequentiallyConsistent` (more culling, but higher contention) and
		// 2. `.relaxed` (less contention, but less culling and potentially more redundant computation).
		let lowestCostYet = bestGroupYetAtomic.load(ordering: .relaxed)?.cost ?? Double.infinity

		let candidate = await findBestGroup(startingWith: item, lowestCostYet: lowestCostYet)

		// Update our `bestGroupYet` ASAP, so its value can be used by other starting tasks.
		updateIfLowerCost(candidate)

		return candidate
	}

	private func updateIfLowerCost(_ candidate: ItemGroup) {
		var exchanged = false

		while !exchanged {
			let current = bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)

			if candidate.cost < current?.cost ?? Double.infinity {
				exchanged = bestGroupYetAtomic.compareExchange(
					expected: current,
					desired: candidate,
					ordering: .sequentiallyConsistent
				).exchanged
			} else {
				return // The existing cost was lower. Don't change it.
			}
		}

		print("New best item: \(candidate.cost)")
	}

	private func findBestGroup(startingWith item: Item, lowestCostYet: Double) async -> ItemGroup {
		// Replace this with the real algorithm.
		let resultCost = Double.random(in: 0...Double(item.id * item.id)) / 1000
		print("Item \(item.id): Started... will take \(resultCost)s.")
		hogCPU(forSeconds: resultCost)
		print("Item \(item.id): done.")
		return ItemGroup(cost: resultCost)
	}

	private func hogCPU(forSeconds seconds: TimeInterval) {
		// Intentionally block the thread using `Thread.sleep` (e.g. instead of `Task.sleep`), to simulate
		// a CPU-intensive computation (as opposed to I/O like a network request that can yield the thread).
		Thread.sleep(forTimeInterval: seconds)

		// Alternatively, we can use a busy-wait and really use the CPU.
//		let deadline = DispatchTime.now() + DispatchTimeInterval.seconds(1)
//		while DispatchTime.now() < deadline {}
	}
}

if #available(macOS 10.15, *) {
	// We base the computation time on the item IDs. For demonstration, we start large and get smaller,
	// so we're more likely to find new lowest costs as computation progresses.
	let dummyItems: Array<Item> = (1..<100).map(Item.init).reversed()
	let result = await GroupFinder().findBestGroup(items: dummyItems)
	print("Best result overall: \(result!.cost)")
}

Good catch.

True. rwlocks also have other issues (they degrade to mutexes under surprisingly light write loads due to reader-reader contention while writers are pending, and they break priority donation). Those may or may not be important for your usage though.

1 Like

Oh that’s great, thanks again!

How about something like this? No mutex needed:

struct Solution {...}
struct Input {...}

func doSomething(_: Input, _ Solution?) -> Solution { ... }

...

await withTaskGroup(of: Solution.self) { group in
  var numTasks = 0
  var bestSolution: Solution? = nil
  func startTask() {
    if let nextInput = ... {
      group.addTask { () -> Solution in
        return doSomething(nextInput, bestSolution)
      }
      numTasks += 1
    }
  }

  func completeTask(_ solution: Solution) {
    if bestSolution == nil || solution < bestSolution! {
      bestSolution = solution
    }
    numTasks -= 1
  }

  for _ in 0 ..< 32 {
    startTask()
  }

  for await solution in group {
    startTask()
    completeTask(solution)
  }
}
4 Likes

If I understand correctly, this would mean that the first 32 tasks all start from nil, and no more than 32 tasks ever run at the same time.

On a device with only 4 cores, 28 tasks would unnecessarily start from nil when they “could have” gotten a head start.

And on a device with 96 cores, 64 of them would remain idle the whole time.

That's right, but of course you can change the parameter. However, yeah, the other problem you pointed out is inherent with my approach and requires shared state to solve:

1 Like

After doing a bit of reading, I think this should use weakCompareExchange since it’s in a loop, because that might be more efficient on some systems. Also, it can use the return value of the compare-exchange to update current, so there’s only one atomic access within the loop:

private func updateIfLowerCost(_ candidate: ItemGroup) {
  var exchanged = false
  var current = bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)
  
  while !exchanged {
    if candidate.cost < current?.cost ?? Double.infinity {
      (exchanged, current) = bestGroupYetAtomic.weakCompareExchange(
        expected: current,
        desired: candidate,
        ordering: .sequentiallyConsistent
      )
    } else {
      return // The existing cost was lower. Don't change it.
    }
  }

  print("New best item: \(candidate.cost)")
}

Does that sound right?

And… I don’t know enough about atomic memory orderings to judge for myself, but would this still work correctly if the initial load (before the loop) used .relaxed ordering? Would there be any benefit?

I assume the actual exchange must be sequentially consistent, but I’m not entirely sure what could go wrong if it weren’t.

2 Likes

Nice findings. It all seems plausibly correct, but I don't know :smiley:

Can someone please explain the reasoning behind the weak vs. strong compareAndExchange functions? The docs mention:

This compare-exchange variant is allowed to spuriously fail ... In this weak form, transient conditions may cause the original == expected check to sometimes return false when the two values are in fact the same.

I'm sure there's some benefit to this odd behaviour, perhaps it lowers to some different instruction that affords the hardware some clever trick to do something better. What is it, exactly?

Re. memory orderings: The Swift docs on memory ordering refer to the C++ std::memory_order counterparts. But the docs for those were clearly designed for lawyers, and I'm a mere software engineer, so I'm a bit out of my league lol

2 Likes

Actually, now that I think about it, this could work really well just by letting the number of tasks increase:

  startTask()
  
  for await solution in group {
    completeTask(solution)
    for _ in 0..<3 {
      startTask()
    }
  }

Now only one task starts from nil, and each time one finishes, several more begin. So this should quickly saturate the available processors.

It doesn’t need locks or atomics, though it does mean the first task runs alone until it completes. Obviously I could launch more than one task at the outset, but starting from nil is kind of a big slowdown so I’d rather only do it once.

An improved design might launch one task, wait until it finds the first valid group, and then start launching more tasks. That level of optimization may be more than I need to worry about though.

1 Like

Another possibility is to keep a count of the number of pending tasks together with a count of started tasks. This requires a single atomic since the started task counter needs to be incremented in the Task closure itself. Then you can continue starting new tasks until the number of pending tasks exceeds the number of started tasks, then you wait for one to complete, etc.

2 Likes

…is there a good way to do this?

That is, inside the withTaskGroup closure I want to start one task. At some point in that task, it decides “Okay, now it’s time to add more tasks to the group.”

Can I call @Slava_Pestov’s startTask function from within the already-running task, in order to add more tasks to the same task group?