I am grateful that I stumbled upon the phrase 'thread explosion', as this is the first time I have encountered it with GCD.
I have application code I have been using for a long time, and it relies on a concurrency pattern with GCD that I have been using for years without issue. I am doing trivial parallelization of a task across a defined number of threads, which is set as equal to the number of available cores.
This code works fine under macOS and Linux, but in an Ubuntu 22.04 docker container running in a GPU on-demand service (runpod.io), where I expect to see the specified 10 threads running, there are up to 200+ threads, and the computation takes essentially as long as if single-threaded.
The pattern I have used 'forever' is to have two queues, one concurrent to run the parallel computations for separate chunks of data, and a synchronous queue so that each thread can deposit a partial result in a pre-allocated array that I always call BLOCKS. It is initialized with nils equal in number to the specified threads, and partial result for each task is stored at the appropriate index of BLOCKS.
The code below shows the pattern I am following. Have I gotten 'away' with a poor design pattern for a long time?
Thanks in advance,
Randy
// create queues
import Dispatch
let computeQueue = DispatchQueue( label:"compute", attributes: .concurrent )
let blocksQueue = DispatchQueue( label:"blocks" )
// example of using the pattern in a function
func someFunc(....) {
/* initial set up partitions of data */
// Initialize array to receive results with data partitioned into 'nchunks' pieces
var BLOCKS:[[(AtomCircle,Int)]?] = Array(repeating:nil, count:nchunks)
// loop to spawn 'nchunks' tasks to run parallel computations, first
// creating a dispatch group
let group = DispatchGroup()
for ichunk in 0..<nchunks {
group.enter()
computeQueue.async {
// call a function to process this chunk, result returned as 'data'
let data = circlesForAtoms( atompos:atompos, radii:radii, proberad:proberad,
axis:axis, minCoord:minaxiscoord, delta:layerdelta, limits:LIMITS[ichunk],
thread:ichunk )
// schedule a synchronous call to a function that assigns the partial
// result to an index in BLOCKS for later processing (func addBLOCK
// shown below)
blocksQueue.sync {
addBLOCK( &BLOCKS, data )
}
group.leave()
// end of async closure
}
// end of loop to process data chunks
}
// wait for all tasks to complete
group.wait()
/ * additional processing to merge results */
/** end of someFunc **/
}
// here is the addBLOCK function to assign partial results to array ;
// the task/thread index is the second member of the tuple returned by the
// task
func addBLOCK( _ BLOCKS: inout [[(AtomCircle,Int)]?], _ data:([(AtomCircle,Int)],Int) ) {
BLOCKS[data.1] = data.0
}