Instead of a busy waiting loop, there are pause instructions typically for this purposes that are suitable for the purpose, e.g for x86:
(random quick googled reference, assuming similar exists for ARM)
Instead of a busy waiting loop, there are pause instructions typically for this purposes that are suitable for the purpose, e.g for x86:
(random quick googled reference, assuming similar exists for ARM)
Not worth raising the version just for Mutex
, IMO.
I was able to make this work with an ManagedAtomic
based on @tera's suggestion, which drops the requirement down to macOS 10.15
(for withTaskGroup(...)
.
import Foundation // For Thread, Dispatch Time, etc.
import Atomics
/// A dummy input.
struct Item {
/// An ID for human readability.
/// For demo purposes, higher IDs are more likely to take longer to process than lower IDs.
let id: Int
}
/// A dummy result
final class ItemGroup: Sendable, AtomicReference {
let cost: Double
init(cost: Double) { self.cost = cost }
}
@available(macOS 10.15, *)
final class GroupFinder: Sendable {
private let bestGroupYetAtomic: ManagedAtomic<ItemGroup?> = .init(nil)
public func findBestGroup(items: [Item]) async -> ItemGroup? {
if items.isEmpty { return nil }
return await withTaskGroup(of: ItemGroup.self) { taskGroup in
for item in items { // Queue up all the work
taskGroup.addTask { @Sendable in
await self.findBestGroup(startingWith: item)
}
}
// Once everything is queued up, start waiting for the results.
// The cooperative thread pool will run the tasks in parallel, with 1 thread per core.
await taskGroup.waitForAll()
return bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)
}
}
private func findBestGroup(startingWith item: Item) async -> ItemGroup {
// The `lowestCostYet` is just an optimization for culling computations that can't possibly
// be better that what's already been seen so far. If we load a value and it ends up being stale,
// we might end culling less and doing more redundant computation. So it's a can trade-off between:
// 1. `.sequentiallyConsistent` (more culling, but higher contention) and
// 2. `.relaxed` (less contention, but less culling and potentially more redundant computation).
let lowestCostYet = bestGroupYetAtomic.load(ordering: .relaxed)?.cost ?? Double.infinity
let candidate = await findBestGroup(startingWith: item, lowestCostYet: lowestCostYet)
// Update our `bestGroupYet` ASAP, so its value can be used by other starting tasks.
updateIfLowerCost(candidate)
return candidate
}
private func updateIfLowerCost(_ candidate: ItemGroup) {
var exchanged = false
while !exchanged {
let current = bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)
if candidate.cost < current?.cost ?? Double.infinity {
exchanged = bestGroupYetAtomic.compareExchange(
expected: current,
desired: candidate,
ordering: .sequentiallyConsistent
).exchanged
} else {
return // The existing cost was lower. Don't change it.
}
}
print("New best item: \(candidate.cost)")
}
private func findBestGroup(startingWith item: Item, lowestCostYet: Double) async -> ItemGroup {
// Replace this with the real algorithm.
let resultCost = Double.random(in: 0...Double(item.id * item.id)) / 1000
print("Item \(item.id): Started... will take \(resultCost)s.")
hogCPU(forSeconds: resultCost)
print("Item \(item.id): done.")
return ItemGroup(cost: resultCost)
}
private func hogCPU(forSeconds seconds: TimeInterval) {
// Intentionally block the thread using `Thread.sleep` (e.g. instead of `Task.sleep`), to simulate
// a CPU-intensive computation (as opposed to I/O like a network request that can yield the thread).
Thread.sleep(forTimeInterval: seconds)
// Alternatively, we can use a busy-wait and really use the CPU.
// let deadline = DispatchTime.now() + DispatchTimeInterval.seconds(1)
// while DispatchTime.now() < deadline {}
}
}
if #available(macOS 10.15, *) {
// We base the computation time on the item IDs. For demonstration, we start large and get smaller,
// so we're more likely to find new lowest costs as computation progresses.
let dummyItems: Array<Item> = (1..<100).map(Item.init).reversed()
let result = await GroupFinder().findBestGroup(items: dummyItems)
print("Best result overall: \(result!.cost)")
}
Good catch.
True. rwlocks also have other issues (they degrade to mutexes under surprisingly light write loads due to reader-reader contention while writers are pending, and they break priority donation). Those may or may not be important for your usage though.
Oh that’s great, thanks again!
How about something like this? No mutex needed:
struct Solution {...}
struct Input {...}
func doSomething(_: Input, _ Solution?) -> Solution { ... }
...
await withTaskGroup(of: Solution.self) { group in
var numTasks = 0
var bestSolution: Solution? = nil
func startTask() {
if let nextInput = ... {
group.addTask { () -> Solution in
return doSomething(nextInput, bestSolution)
}
numTasks += 1
}
}
func completeTask(_ solution: Solution) {
if bestSolution == nil || solution < bestSolution! {
bestSolution = solution
}
numTasks -= 1
}
for _ in 0 ..< 32 {
startTask()
}
for await solution in group {
startTask()
completeTask(solution)
}
}
If I understand correctly, this would mean that the first 32 tasks all start from nil
, and no more than 32 tasks ever run at the same time.
On a device with only 4 cores, 28 tasks would unnecessarily start from nil when they “could have” gotten a head start.
And on a device with 96 cores, 64 of them would remain idle the whole time.
That's right, but of course you can change the parameter. However, yeah, the other problem you pointed out is inherent with my approach and requires shared state to solve:
After doing a bit of reading, I think this should use weakCompareExchange
since it’s in a loop, because that might be more efficient on some systems. Also, it can use the return value of the compare-exchange to update current
, so there’s only one atomic access within the loop:
private func updateIfLowerCost(_ candidate: ItemGroup) {
var exchanged = false
var current = bestGroupYetAtomic.load(ordering: .sequentiallyConsistent)
while !exchanged {
if candidate.cost < current?.cost ?? Double.infinity {
(exchanged, current) = bestGroupYetAtomic.weakCompareExchange(
expected: current,
desired: candidate,
ordering: .sequentiallyConsistent
)
} else {
return // The existing cost was lower. Don't change it.
}
}
print("New best item: \(candidate.cost)")
}
Does that sound right?
And… I don’t know enough about atomic memory orderings to judge for myself, but would this still work correctly if the initial load
(before the loop) used .relaxed
ordering? Would there be any benefit?
I assume the actual exchange must be sequentially consistent, but I’m not entirely sure what could go wrong if it weren’t.
Nice findings. It all seems plausibly correct, but I don't know
Can someone please explain the reasoning behind the weak vs. strong compareAndExchange functions? The docs mention:
This compare-exchange variant is allowed to spuriously fail ... In this weak form, transient conditions may cause the
original == expected
check to sometimes return false when the two values are in fact the same.
I'm sure there's some benefit to this odd behaviour, perhaps it lowers to some different instruction that affords the hardware some clever trick to do something better. What is it, exactly?
Re. memory orderings: The Swift docs on memory ordering refer to the C++ std::memory_order
counterparts. But the docs for those were clearly designed for lawyers, and I'm a mere software engineer, so I'm a bit out of my league lol
Actually, now that I think about it, this could work really well just by letting the number of tasks increase:
startTask()
for await solution in group {
completeTask(solution)
for _ in 0..<3 {
startTask()
}
}
Now only one task starts from nil
, and each time one finishes, several more begin. So this should quickly saturate the available processors.
It doesn’t need locks or atomics, though it does mean the first task runs alone until it completes. Obviously I could launch more than one task at the outset, but starting from nil
is kind of a big slowdown so I’d rather only do it once.
An improved design might launch one task, wait until it finds the first valid group, and then start launching more tasks. That level of optimization may be more than I need to worry about though.
Another possibility is to keep a count of the number of pending tasks together with a count of started tasks. This requires a single atomic since the started task counter needs to be incremented in the Task closure itself. Then you can continue starting new tasks until the number of pending tasks exceeds the number of started tasks, then you wait for one to complete, etc.
…is there a good way to do this?
That is, inside the withTaskGroup
closure I want to start one task. At some point in that task, it decides “Okay, now it’s time to add more tasks to the group.”
Can I call @Slava_Pestov’s startTask
function from within the already-running task, in order to add more tasks to the same task group?