Looking at parallelising some filtering or large data sets using waitWithTaskGroup, it seems that the task handling the results won't get scheduled until all the child tasks are done - I've played with different sized chunking of the work (from 8 - 200 child tasks on my machine with 20 cores) and it seems that regardless of what I do, the child tasks are nicely parallelized but the reaping of results seems to be blocked until all children are done.
Anyone else who've seen this? It doesn't seem as expected really as out-of-order return of the results is a feature of this api (the sample attached does in fact reorder the results as needed).
I had hoped to be able to process the results in parallel as they are done (and just discovered this when adding a deterministic progress bar in the reaping loop that never got updated until all was done...).
... // Yeah, we do the dirty trick with a semaphore / GCD thread here as we need to access some shared state efficiently
let semaphore = DispatchSemaphore(value: 0)
DispatchQueue.global(qos: .userInitiated).async {
Task(priority: .medium) {
await withTaskGroup(of: (Int, [DataModel._TransactionStruct.ID]).self,
returning: Void.self) { taskGroup in
for (chunkIndex, transactionIdentifiers) in chunkedTransactions.wrappedValue.enumerated() {
taskGroup.addTask(priority: .low) { // We try to ensure that the child tasks are running at lower priority
(chunkIndex, self.filteredResults(transactionIdentifiers: ArraySlice(transactionIdentifiers),
pluginTransactions: pluginProvidedTransactionsTransfer.wrappedValue,
localSearchUpdate: searchUpdateTransfer.wrappedValue))
}
}
// This is just a helper array to order the results
// swiftlint:disable:next discouraged_optional_collection
var resultsArray: [[_TransactionStruct.ID]?] = .init(repeating: nil,
count: chunkedTransactions.wrappedValue.count)
var nextResult = 0
// Here we sort the insertions by chunk index, as this gives
// us the items in properly sorted order, allowing us to
// skip the post sorting otherwise required when filtering concurrently
for await result in taskGroup { // presumably this is running at .medium
let (chunkIndex, transactionResult) = result
// We don't get the results here until all children are done, but expect this to be parallel
resultsArray[chunkIndex] = transactionResult
// Process everything that is currently correctly in-order
while nextResult < resultsArray.count, let nextResultToProcess = resultsArray[nextResult] {
nextResultToProcess.forEach {
transactionResults.wrappedValue.append($0)
}
nextResult += 1
}
}
}
semaphore.signal()
}
}
semaphore.wait()
...
Any thoughts? It seems the for await result in taskGroup is only starting to return data as all is completed, missing a major parallelisation opportunity.
There's a lot going on here, I'll try to distill this to some core functionality and verify.
For what it's worth it doesn't make sense to create child tasks at low priority because the group awaiting immediately would escalate those low priority tasks to the priority of the task waiting on the group.
Results should be trickling into the for await ... in group as they complete in general, so that expectation is right in general. I'll try to distill this and verify what's happening though.
Thanks, any hints appreciated! I didn't start out with setting any priority at all here, but tried to add it to enforce the desired/expected behaviour. It'd be great to be able to start processing the results as they trickle in for sure.
For completeness, here is the full current code (which limits the number of chunks to less than the number of cpus, as we wouldn't get time for a task running @MainActor to update a progress bar otherwise, so it is fairly expected all of them are done approximately at the same time, but I've played with 200 chunks as well (and then I'd hope to get the reaper task running and start processing)).
I guess it can be as silly that we have two different 'abnormal' cases:
If the number of chunks are < number of processors, they all end up finishing approximately at the same time, so we reap them all at the same time
If the number of chunks is > number of processors (I picked 200 in my case), our task doing the reaping doesn't get scheduled - if that is the case, what I'm missing are some knobs to influence the priority of the reaper task from the children, so we can get concurrency there...
I'm testing this on an array with ~10M identifiers (self.filteredResults() basically returns a subset of transaction id:s that are .filter()ed out with a couple of dictionary lookups of the real data and a few string or number comparisons depending on filter criteria in use).
func concurrentFilter() -> [DataModel._TransactionStruct.ID] {
let transactionResults: UnsafeMutableTransferBox<[DataModel._TransactionStruct.ID]> = .init([])
let pluginProvidedTransactionsTransfer: UnsafeTransfer<TransactionsDictionary> = .init(pluginProvidedTransactions)
let searchUpdateTransfer: UnsafeTransfer<SearchUpdate> = .init(localSearchUpdate)
let onlineCPU = onlineProcessors()
// let onlineCPU = 200
let transactionChunkSize = (pluginProvidedTransactionSortedKeys.count / (onlineCPU > 2 ? onlineCPU - 2 : 1))
let chunkedTransactions: UnsafeTransfer<ChunksOfCountCollection<ArraySlice<DataModel._TransactionStruct.ID>>> =
.init(ArraySlice(pluginProvidedTransactionSortedKeys)
.chunks(ofCount: transactionChunkSize))
transactionResults.wrappedValue.reserveCapacity(pluginProvidedTransactions.count)
let semaphore = DispatchSemaphore(value: 0)
DispatchQueue.global(qos: .userInitiated).async {
Task(priority: .medium) {
await withTaskGroup(of: (Int, [DataModel._TransactionStruct.ID]).self,
returning: Void.self) { taskGroup in
for (chunkIndex, transactionIdentifiers) in chunkedTransactions.wrappedValue.enumerated() {
taskGroup.addTask(priority: .low) { // We try to ensure that the child tasks are running at lower priority
(chunkIndex, self.filteredResults(transactionIdentifiers: ArraySlice(transactionIdentifiers),
pluginTransactions: pluginProvidedTransactionsTransfer.wrappedValue,
localSearchUpdate: searchUpdateTransfer.wrappedValue))
}
}
// This is just a helper array to order the results
// swiftlint:disable:next discouraged_optional_collection
var resultsArray: [[_TransactionStruct.ID]?] = .init(repeating: nil,
count: chunkedTransactions.wrappedValue.count)
var nextResult = 0
// Here we sort the insertions by chunk index, as this gives
// us the items in properly sorted order, allowing us to
// skip the post sorting otherwise required when filtering concurrently
for await result in taskGroup { // presumably this is running at .medium
let (chunkIndex, transactionResult) = result
resultsArray[chunkIndex] = transactionResult
// Process everything that is currently correctly in-order
// let progressCount = resultsArray.filter { $0 != nil }.count
// let filterProgress = 100.0 * Double(progressCount) / Double(resultsArray.count)
// self.searchInProgressContinuation.yield(.progress(filterProgress))
while nextResult < resultsArray.count, let nextResultToProcess = resultsArray[nextResult] {
nextResultToProcess.forEach {
transactionResults.wrappedValue.append($0)
}
nextResult += 1
}
}
}
semaphore.signal()
}
}
semaphore.wait()
return transactionResults.wrappedValue
}
We have to reduce this some more to figure out if something is really going wrong in the withTaskGroup implementation or if it is in your code. I just reduced this to the simplest from:
await withTaskGroup(of: Int.self) { group in
for i in 0...1000 {
group.addTask {
try! await Task.sleep(for: .seconds(i))
return i
}
}
for await result in group {
print(result)
}
}
The above code does correctly print the results.
One thing that we have to make sure is that you are not blocking any of the cooperative thread pool threads. What are you doing inside your child tasks? Are you doing some heavy computation or a blocking system call? If all child tasks are scheduled on a thread then the reaping task won't have a chance to collect the next result
I would start by removing all Dispatch and priority usage and seeing what the natural behavior of the logic is simply within the concurrency domain. Then try to customize things if the behavior isn't what you want. Having implemented groups myself, what you're seeing doesn't match what I've seen, where child tasks finish one at a time.
That is a very important point as well. I assume you call this from a DispatchQueue or your own threading system but not a Concurrency thread. If you can something that uses DispatchSemaphore from a Concurrency thread then you are in for a bad time.
The results are correct for me, it is just that I only get all of them 'at the end' when all child tasks are finished - I want to get concurrency on the post processing / accumulation of the reaped child task results.
I'm not blocking any cooperative threads there, it is semi-heavy computations on the ArraySlice only (hash table lookup + a couple of case insensitive string searches), a typical filter operation would be e.g.:
// Returns subset of filtered transactions identifiers that matches current search scope,
// The relative order in the original array (slice) must be maintained to keep sorting working
// This function uses unsafe direct acess of transactions in a concurrent context and
// will only be called from within a non-suspendable actor context to maintain consistency
// swiftlint:disable:next cyclomatic_complexity function_body_length
nonisolated func filteredResults(transactionIdentifiers: ArraySlice<DataModel._TransactionStruct.ID>,
pluginTransactions: TransactionsDictionary,
localSearchUpdate: SearchUpdate) -> [DataModel._TransactionStruct.ID] {
...
switch localSearchUpdate.searchScope {
... // various cases here, just showing one actual as example:
case .instrument:
return transactionIdentifiers.filter {
guard let transaction = pluginTransactions[$0] else {
fatalError("Could not find \(0) in pluginTransactions")
}
if let value = transaction.instrument, let instrument = globalInstruments[value],
let shortName = instrument.shortName, let longName = instrument.longName,
shortName.range(of: localSearchUpdate.searchString, options: [.caseInsensitive, .diacriticInsensitive]) != nil ||
longName.range(of: localSearchUpdate.searchString, options: [.caseInsensitive, .diacriticInsensitive]) != nil {
return true
}
return false
}
...
Yeah, that's why I have the DispatchQueue.global(qos: .userInitiated).async { in there, to ensure I don't break the 'must make progress' contract for threads from Concurrency.
Setting a QoS does nothing to ensure forward progress, not unless you have some other priority inversion, and I'm not sure Dispatch QoSes would help concurrency QoS anyway. To ensure forward progress you simply need to ensure your work isn't blocked waiting on other work that can be blocked. Doing a lot of unblocked work is fine. If it takes noticeable time you may want to introduce explicit yields at particular points just to make sure the thread pool can service other work.
To avoid overwhelming the thread pool you could ensure there are never more than some number less than the logical CPUs children in flight. You're already mostly there.
for the record; I think Iāve managed to clean out all the extraneous dispatch stuff and priorities now and it seems to work as expected (testing in progress thoughā¦), have to be careful about the initial suspension point though, but that is a different issue.
Funnily enough, due to other optimizations the processing now is so fast that filtering 10+ million items on multiple properties is less than the debounce time threshold for displaying a progress bar (300ms) so one of the original drivers looking at this disappeared. Thanks for the replies that helped push me in the proper direction.