waitWithTaskGroup only seems to reap results when all child tasks are done

Hi,

Looking at parallelising some filtering or large data sets using waitWithTaskGroup, it seems that the task handling the results won't get scheduled until all the child tasks are done - I've played with different sized chunking of the work (from 8 - 200 child tasks on my machine with 20 cores) and it seems that regardless of what I do, the child tasks are nicely parallelized but the reaping of results seems to be blocked until all children are done.

Anyone else who've seen this? It doesn't seem as expected really as out-of-order return of the results is a feature of this api (the sample attached does in fact reorder the results as needed).

I had hoped to be able to process the results in parallel as they are done (and just discovered this when adding a deterministic progress bar in the reaping loop that never got updated until all was done...).

... // Yeah, we do the dirty trick with a semaphore / GCD thread here as we need to access some shared state efficiently
        let semaphore = DispatchSemaphore(value: 0)

        DispatchQueue.global(qos: .userInitiated).async {
            Task(priority: .medium) {
                await withTaskGroup(of: (Int, [DataModel._TransactionStruct.ID]).self,
                                    returning: Void.self) { taskGroup in
                    for (chunkIndex, transactionIdentifiers) in chunkedTransactions.wrappedValue.enumerated() {
                        taskGroup.addTask(priority: .low) { // We try to ensure that the child tasks are running at lower priority
                            (chunkIndex, self.filteredResults(transactionIdentifiers: ArraySlice(transactionIdentifiers),
                                                              pluginTransactions: pluginProvidedTransactionsTransfer.wrappedValue,
                                                              localSearchUpdate: searchUpdateTransfer.wrappedValue))
                        }
                    }
                    // This is just a helper array to order the results
                    // swiftlint:disable:next discouraged_optional_collection
                    var resultsArray: [[_TransactionStruct.ID]?] = .init(repeating: nil,
                                                                         count: chunkedTransactions.wrappedValue.count)
                    var nextResult = 0

                    // Here we sort the insertions by chunk index, as this gives
                    // us the items in properly sorted order, allowing us to
                    // skip the post sorting otherwise required when filtering concurrently
                    for await result in taskGroup { // presumably this is running at .medium
                        let (chunkIndex, transactionResult) = result

// We don't get the results here until all children are done, but expect this to be parallel

                        resultsArray[chunkIndex] = transactionResult
                        // Process everything that is currently correctly in-order
                        while nextResult < resultsArray.count, let nextResultToProcess = resultsArray[nextResult] {
                            nextResultToProcess.forEach {
                                transactionResults.wrappedValue.append($0)
                            }
                            nextResult += 1
                        }
                    }
                }
                semaphore.signal()
            }
        }
        semaphore.wait()
...

Any thoughts? It seems the for await result in taskGroup is only starting to return data as all is completed, missing a major parallelisation opportunity.

There's a lot going on here, I'll try to distill this to some core functionality and verify.

For what it's worth it doesn't make sense to create child tasks at low priority because the group awaiting immediately would escalate those low priority tasks to the priority of the task waiting on the group.

Results should be trickling into the for await ... in group as they complete in general, so that expectation is right in general. I'll try to distill this and verify what's happening though.

1 Like

Thanks, any hints appreciated! I didn't start out with setting any priority at all here, but tried to add it to enforce the desired/expected behaviour. It'd be great to be able to start processing the results as they trickle in for sure.

For completeness, here is the full current code (which limits the number of chunks to less than the number of cpus, as we wouldn't get time for a task running @MainActor to update a progress bar otherwise, so it is fairly expected all of them are done approximately at the same time, but I've played with 200 chunks as well (and then I'd hope to get the reaper task running and start processing)).

I guess it can be as silly that we have two different 'abnormal' cases:

  1. If the number of chunks are < number of processors, they all end up finishing approximately at the same time, so we reap them all at the same time
  2. If the number of chunks is > number of processors (I picked 200 in my case), our task doing the reaping doesn't get scheduled - if that is the case, what I'm missing are some knobs to influence the priority of the reaper task from the children, so we can get concurrency there...

I'm testing this on an array with ~10M identifiers (self.filteredResults() basically returns a subset of transaction id:s that are .filter()ed out with a couple of dictionary lookups of the real data and a few string or number comparisons depending on filter criteria in use).

    func concurrentFilter() -> [DataModel._TransactionStruct.ID] {
        let transactionResults: UnsafeMutableTransferBox<[DataModel._TransactionStruct.ID]> = .init([])
        let pluginProvidedTransactionsTransfer: UnsafeTransfer<TransactionsDictionary> = .init(pluginProvidedTransactions)
        let searchUpdateTransfer: UnsafeTransfer<SearchUpdate> = .init(localSearchUpdate)
        let onlineCPU = onlineProcessors() 
//        let onlineCPU = 200 
        let transactionChunkSize = (pluginProvidedTransactionSortedKeys.count / (onlineCPU > 2 ? onlineCPU - 2 : 1))
        let chunkedTransactions: UnsafeTransfer<ChunksOfCountCollection<ArraySlice<DataModel._TransactionStruct.ID>>> =
            .init(ArraySlice(pluginProvidedTransactionSortedKeys)
                .chunks(ofCount: transactionChunkSize))
        transactionResults.wrappedValue.reserveCapacity(pluginProvidedTransactions.count)

        let semaphore = DispatchSemaphore(value: 0)

        DispatchQueue.global(qos: .userInitiated).async {
            Task(priority: .medium) {
                await withTaskGroup(of: (Int, [DataModel._TransactionStruct.ID]).self,
                                    returning: Void.self) { taskGroup in
                    for (chunkIndex, transactionIdentifiers) in chunkedTransactions.wrappedValue.enumerated() {
                        taskGroup.addTask(priority: .low) { // We try to ensure that the child tasks are running at lower priority
                            (chunkIndex, self.filteredResults(transactionIdentifiers: ArraySlice(transactionIdentifiers),
                                                              pluginTransactions: pluginProvidedTransactionsTransfer.wrappedValue,
                                                              localSearchUpdate: searchUpdateTransfer.wrappedValue))
                        }
                    }
                    // This is just a helper array to order the results
                    // swiftlint:disable:next discouraged_optional_collection
                    var resultsArray: [[_TransactionStruct.ID]?] = .init(repeating: nil,
                                                                         count: chunkedTransactions.wrappedValue.count)
                    var nextResult = 0

                    // Here we sort the insertions by chunk index, as this gives
                    // us the items in properly sorted order, allowing us to
                    // skip the post sorting otherwise required when filtering concurrently
                    for await result in taskGroup { // presumably this is running at .medium
                        let (chunkIndex, transactionResult) = result
                        resultsArray[chunkIndex] = transactionResult
                        // Process everything that is currently correctly in-order
                        //                        let progressCount = resultsArray.filter { $0 != nil }.count
                        //                        let filterProgress = 100.0 * Double(progressCount) / Double(resultsArray.count)
                        //                        self.searchInProgressContinuation.yield(.progress(filterProgress))
                        while nextResult < resultsArray.count, let nextResultToProcess = resultsArray[nextResult] {
                            nextResultToProcess.forEach {
                                transactionResults.wrappedValue.append($0)
                            }
                            nextResult += 1
                        }
                    }
                }
                semaphore.signal()
            }
        }
        semaphore.wait()

        return transactionResults.wrappedValue
    }

We have to reduce this some more to figure out if something is really going wrong in the withTaskGroup implementation or if it is in your code. I just reduced this to the simplest from:

await withTaskGroup(of: Int.self) { group in
    for i in 0...1000 {
        group.addTask {
            try! await Task.sleep(for: .seconds(i))
            return i
        }
    }

    for await result in group {
        print(result)
    }
}

The above code does correctly print the results.

One thing that we have to make sure is that you are not blocking any of the cooperative thread pool threads. What are you doing inside your child tasks? Are you doing some heavy computation or a blocking system call? If all child tasks are scheduled on a thread then the reaping task won't have a chance to collect the next result

I would start by removing all Dispatch and priority usage and seeing what the natural behavior of the logic is simply within the concurrency domain. Then try to customize things if the behavior isn't what you want. Having implemented groups myself, what you're seeing doesn't match what I've seen, where child tasks finish one at a time.

1 Like

That is a very important point as well. I assume you call this from a DispatchQueue or your own threading system but not a Concurrency thread. If you can something that uses DispatchSemaphore from a Concurrency thread then you are in for a bad time.

1 Like

The results are correct for me, it is just that I only get all of them 'at the end' when all child tasks are finished - I want to get concurrency on the post processing / accumulation of the reaped child task results.

I'm not blocking any cooperative threads there, it is semi-heavy computations on the ArraySlice only (hash table lookup + a couple of case insensitive string searches), a typical filter operation would be e.g.:

    // Returns subset of filtered transactions identifiers that matches current search scope,
    // The relative order in the original array (slice) must be maintained to keep sorting working
    // This function uses unsafe direct acess of transactions in a concurrent context and
    // will only be called from within a non-suspendable actor context to maintain consistency
    // swiftlint:disable:next cyclomatic_complexity function_body_length
    nonisolated func filteredResults(transactionIdentifiers: ArraySlice<DataModel._TransactionStruct.ID>,
                                     pluginTransactions: TransactionsDictionary,
                                     localSearchUpdate: SearchUpdate) -> [DataModel._TransactionStruct.ID] {
...
        switch localSearchUpdate.searchScope {
... // various cases here, just showing one actual as example:
        case .instrument:
            return transactionIdentifiers.filter {
                guard let transaction = pluginTransactions[$0] else {
                    fatalError("Could not find \(0) in pluginTransactions")
                }

                if let value = transaction.instrument, let instrument = globalInstruments[value],
                   let shortName = instrument.shortName, let longName = instrument.longName,
                   shortName.range(of: localSearchUpdate.searchString, options: [.caseInsensitive, .diacriticInsensitive]) != nil ||
                    longName.range(of: localSearchUpdate.searchString, options: [.caseInsensitive, .diacriticInsensitive]) != nil {
                    return true
                }
                return false
            }
...

Yeah, that's why I have the DispatchQueue.global(qos: .userInitiated).async { in there, to ensure I don't break the 'must make progress' contract for threads from Concurrency.

Setting a QoS does nothing to ensure forward progress, not unless you have some other priority inversion, and I'm not sure Dispatch QoSes would help concurrency QoS anyway. To ensure forward progress you simply need to ensure your work isn't blocked waiting on other work that can be blocked. Doing a lot of unblocked work is fine. If it takes noticeable time you may want to introduce explicit yields at particular points just to make sure the thread pool can service other work.

To avoid overwhelming the thread pool you could ensure there are never more than some number less than the logical CPUs children in flight. You're already mostly there.

I’ll refactor it to use an async stream instead of a semaphore so the task can be properly parked tomorrow and see what that gives…

1 Like

for the record; I think I’ve managed to clean out all the extraneous dispatch stuff and priorities now and it seems to work as expected (testing in progress though…), have to be careful about the initial suspension point though, but that is a different issue.

Funnily enough, due to other optimizations the processing now is so fast that filtering 10+ million items on multiple properties is less than the debounce time threshold for displaying a progress bar (300ms) so one of the original drivers looking at this disappeared. Thanks for the replies that helped push me in the proper direction.

3 Likes

But also due to lack of non reentrant actor annotations I’ve had to resort to @gwendal.roue async semaphore in one place - thanks for that - but hope for the non reentrant actor annotation in the related case to eventually surface…

1 Like