I have done no multithreading programming, and I am trying to work out how to use multiple cores to make maths computation faster. Reading around doesn't seem to be helping me. As an over-simple example, suppose I wished to measure how many numbers between 0 and 999999999 end in a zero (yeah, I know). What I would like is a "concur" keyword which was syntactically similar to "for".
var result = [0,0,0,0]
let limit = 1_000_000_000
concur myThreadNum in 0 ... 3
{
for iter in 0 ..< limit / 4
{
result[myThreadNum] += (iter * 4 + myThreadNum) % 10 == 0 ? 1 : 0
}
}
// nothing beyond the concur block will happen until all four threads have finished the block
print (result[0] + result[1] + result[2] + result[3]
Is there any way I can get close to this, or is there a fundamental reason why things have to be more complicated than this? If the four cores modifying different elements of the same array is a problem, a switch on myThreadNum could send the results to different variables, to be collated after the concur block.
There's no keyword for this specifically (generally, you don't want to solve language problems by just stamping out new keywords for every different kind of problem), but it's exactly what the TaskGroup APIs are for.
Here's what that might look like:
let limit = 1_000_000_000
let result: [Int] = await withTaskGroup(of: Int.self) { taskGroup in
for myThreadNum in 0...3 { // Spawn 4 tasks to all start immediately
taskGroup.addTask {
var subcount = 0
for iter in 0..<(limit / 4) {
subcount += (iter * 4 + myThreadNum).isMultiple(of: 10) ? 1 : 0
}
return subcount
}
}
var a = [Int]()
for await taskResult in taskGroup { // Await the 4 tasks
a.append(taskResult)
}
return a
}
assert(result.count == 4, "Wrong count? \(result.count)") // Just to be sure ...
print(result[0] + result[1] + result[2] + result[3])
Yep, it is. There's no way to statically reason about which portions of the array are used by which task (/thread), so there's no way to ensure they don't overlap and have data races.
Note that if you are running a modern ARM system (like Apple Silicon) which has performance and efficiency cores, you’ll want to split up the work into a few times more pieces than you have physical cores so that the faster cores don’t finish earlier and then sit idle.
I don't think looping can be expressed with structure concurrency (someone please correct me), but if you hard-code the number of tasks you want, then yeah it's possible
Example
let limit = 1_000_000_000
func calculateSlice(_ myThreadNum: Int) -> Int {
var subcount = 0
for iter in 0..<(limit / 4) {
subcount += (iter * 4 + myThreadNum).isMultiple(of: 10) ? 1 : 0
}
return subcount
}
async let r0 = calculateSlice(0)
async let r1 = calculateSlice(1)
async let r2 = calculateSlice(2)
async let r3 = calculateSlice(3)
let result = await [r0, r1, r2, r3]
print(result[0] + result[1] + result[2] + result[3])
Once we get MutableSpan I also want to do something like add a version of withContiguousStorageIfAvailable that presents each element of the Array as its own separate Span, so that you can operate on them in parallel.
That would be nice in general, but I still wouldn't use it in this particular case.
This code had a split source of truth, which encoded the thread count in 2 different places (the 0 ... 3 range, and result = [0,0,0,0]). It would crash if the thread count was change in one place but not the other.
The dynamically-sized/growing nature of TaskGroups fixes that.
I'm just going to point out that if you iterate the TaskGroup, you will get the task results in the order they complete, not in the order they began. If you want to keep the order, you're going to need:
return (myThreadNum, subcount)
and
var a = [Int](repeating: 0, count: 4)
for await taskResult in taskGroup { // Await the 4 tasks
let (index, subcount) = taskResult
a[index] = subcount
}
extension RangeReplaceableCollection {
/// Creates a new instance of a collection containing the elements of an asynchronous sequence.
///
/// - Parameter source: The asynchronous sequence of elements for the new collection.
@inlinable
public init<Source: AsyncSequence>(_ source: Source) async rethrows where Source.Element == Element {
self.init()
for try await item in source {
append(item)
}
}
}