[Pitch] Limited Parallelism TaskGroup

The example above would require you to suck the entire "source-sequence" into memory just for it to sit there waiting to be scheduled.

imho the thing to get right is combining the task-group bit with the semaphore-bit in a away that supports back-pressure (ie: only read as much from the input as you are currently processing).

AsyncSemaphore on its own is kind of complete as it is, but it is not the whole story for this feature.

Now, wether this should be an rx-style "map" function or a more declarative shape that you can for await in I am not sure about. My gut feel says that a declarative, for-eachable version would be more en vogue in swift, but I doubt anyone is thinking "we should add more types of TaskGroups, we don't have enough already". :thinking:

Rx-style map functions get fiddly quickly too, especially with structured concurrency and the async sequence shape (it feels like there would have to be an unstructured task somewhere....) :thinking:

AsyncSemaphore that you have shared uses an NSRecursiveLock internally which is not truly asynchronous. Using such locks in async/await contexts will break structured concurrency pattern. The actor that I have shown in the implementation thing would be suffice to act as a Semaphore. I guess @ktoso will agree with me on this.

1 Like

If we want to read only as much tasks as we want, we can maybe define a Producer Protocol that will be used to get respective task on demand when the TaskGroup is ready to execute the task (number of current tasks is less than allowed max). I am considering this similar to cellForRowAt method of UITableViewDataSource if you are familiar with UIKit.

So, the protocol will be like this:

protocol TaskSource<T: Sendable>: Sendable {
    func getTask(at index: Int) -> @Sendable () async -> T
}

val source = someInstanceThatConformsToProtocol()

let taskGroup = LimitedParallelismTaskGroup(maxConcurrentTasks: 2, taskSource: source)

for await result in taskGroup {
      // do something with result
}

So the producer need not give all tasks upfront and can create the tasks on demand. @ktoso

I think you might've misunderstood the implementation details. Although the library does use the theoretically-unsafe-from-async lock/unlock APIs, the author put a lot of attention into details to not lock across suspension points.

Fine, but the solution can be made simpler by using actors

I don't think that is necessary at all.

For the group-style API, we'd simply need to suspend on addTask.

Eg (amending your example from above)

let taskGroup = LimitedParallelismTaskGroup(maxConcurrentTasks: 2) { group in
  for await item in mySource {
     await group.submitTask { // <- awaiting here is all we'd need
       await doSomeLongRunningTask(item)
     }
  } 
}

But I don't think this is a good API shape.

The easiest way to get this somewhat cleanly is I think a forEach function, like:

try await mySource.parallelForEach(maxConcurrent: 5) { item in
    try await myOperation(item)
}

This way we can stay in "structured concurrency" and don't have to think about the back-pressure of the results buffer if it were a "pipeline" function like a parallelAsyncMap.

what does mySource indicate here?

some AsyncSequence

Yes, agreed that an awaiting submit is hard to use in practice, because it is the same task that needs to pull elements off the group -- so if you're stuck on waiting to submit, you cna't pull elements off, causing this weird stall that you can't free memory of elements in the group which are already completed, until another task completes, freeing up you to next() on the group after the addTask gets to run etc... Original designs were considering such shape at some point but it was just too awkward to use in practice.

Maybe still worth reconsidering, but I would recommend always thinking about examples which not only submit but also pull elements off the group.

So, the AsyncSequence would act as the producer protocol right? But I think this will add complexity to the API users as each time AsyncSequence has to be implemented just for providing tasks to an executor :thinking:

@ktoso Please correct me if I’m wrong or misunderstood your post, but I think you may have omitted a key portion of the pattern. The above snippet will only chop the first three ingredients. But we want to chop all the ingredients, just not more than three at a time. So the pattern is something along the lines of:

func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
    await withTaskGroup(
        of: (ChoppedIngredient?).self,
        returning: [any ChoppedIngredient].self) 
    { group in
        // Concurrently chop ingredients
        let maxChopTasks = min(3, ingredients.count)
        
        // run first three concurrently
        for ingredientIndex in 0..<maxChopTasks {
            group.addTask { await chop(ingredients[ingredientIndex]) }
        }

        var choppedIngredients: [any ChoppedIngredient] = []

        // run the rest (if any), accumulating previous task results as we go
        for ingredientIndex in maxChopTasks..<ingredients.count {
            if let taskResult = await group.next(), let choppedIngredient = taskResult {
                choppedIngredients.append(choppedIngredient)
            }
            group.addTask { await chop(ingredients[ingredientIndex]) }
        }

        // accumulate the last tasks
        for await choppedIngredient in group {
            if let choppedIngredient {
                choppedIngredients.append(choppedIngredient)
            }
        }
        return choppedIngredients
    }
}

An alternative pattern is:


func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
    await withTaskGroup(
        of: (ChoppedIngredient?).self,
        returning: [any ChoppedIngredient].self) 
    { group in
        var choppedIngredients: [any ChoppedIngredient] = []

        // create tasks, running no more than 3 at a time, accumulating as we go
        for (index, ingredient) in ingredients.enumerated() {
            if index >= 3, let taskResult = await group.next(), let choppedIngredient = taskResult {
                choppedIngredients.append(choppedIngredient)
            }
            
            group.addTask { await chop(ingredient) }
        }
        
        // accumulate the last tasks
        for await choppedIngredient in group {
            if let choppedIngredient {
                choppedIngredients.append(choppedIngredient)
            }
        }
        return choppedIngredients
    }
}

Some may disagree, but I find both of these unsightly. And because we have to use this pattern all over the place, it’s subject to implementation mistakes. It also entangles the business logic of “chop a bunch of ingredients” with implementation details of how we’ll constrain it to a max of three at a time.

2 Likes

At the risk of beating a dead horse, while I modeled my code snippets above on the original example, generally I would ensure I present the results in the order of the original array, not in a random order. I’d also make this “build the results collection with using task group with constrained concurrency” a generic function.

E.g., maybe, if one wanted an array of results with constrained concurrency, we’d make sure to order those results:

@inlinable public func arrayWithTaskGroup<ChildTaskInput, ChildTaskResult>(
    of childTaskResultType: ChildTaskResult.Type = ChildTaskResult.self,
    isolation: isolated (any Actor)? = #isolation,
    from array: [ChildTaskInput],
    maxConcurrent: Int,
    operation: sending @escaping @isolated(any) (ChildTaskInput) async -> ChildTaskResult
) async -> [ChildTaskResult] where ChildTaskResult : Sendable {
    await withTaskGroup(of: (Int, ChildTaskResult).self) { group in
        precondition(maxConcurrent > 0)

        var results: [Int: ChildTaskResult] = [:]

        for (index, input) in array.enumerated() {
            if index >= maxConcurrent, let result = await group.next() {
                results[result.0] = result.1
            }
            group.addTask { await (index, operation(input)) }
        }

        for await (index, result) in group {
            results[index] = result
        }

        return (0..<array.count)
            .reduce(into: []) { $0.append(results[$1]) }
            .compactMap { $0 }
    }
}

Then my chopping routine is simplified, and the ugly “constrain concurrency” logic is abstracted out:

func chopIngredients(_ ingredients: [any Ingredient]) async -> [(any ChoppedIngredient)?] {
    await arrayWithTaskGroup(from: ingredients, maxConcurrent: 3) { ingredient in
        await chop(ingredient)
    }
}

So, despite the tasks (constrained to a max of 3 at a time) finishing in a somewhat random order, the final results are in the order of the original array:


Or, I might return an order-independent structure. E.g., if the input was Identifiable, I might return a dictionary keyed by ID of the input:

@inlinable public func dictionaryWithTaskGroup<ChildTaskInput, ChildTaskResult>(
    of childTaskResultType: ChildTaskResult.Type = ChildTaskResult.self,
    isolation: isolated (any Actor)? = #isolation,
    from array: [ChildTaskInput],
    maxConcurrent: Int,
    operation: sending @escaping @isolated(any) (ChildTaskInput) async -> ChildTaskResult
) async -> [ChildTaskInput.ID: ChildTaskResult] where ChildTaskInput: Identifiable, ChildTaskResult: Sendable {
    await withTaskGroup(of: (ChildTaskInput.ID, ChildTaskResult).self) { group in
        precondition(maxConcurrent > 0)

        var results: [ChildTaskInput.ID: ChildTaskResult] = [:]

        for (index, input) in array.enumerated() {
            if index >= maxConcurrent, let result = await group.next() {
                results[result.0] = result.1
            }
            group.addTask { await (input.id, operation(input)) }
        }

        for await (id, result) in group {
            results[id] = result
        }

        return results
    }
}

Or if the input was Hashable, I might just use the input as the key to the dictionary:

@inlinable public func dictionaryWithTaskGroup<ChildTaskInput, ChildTaskResult>(
    of childTaskResultType: ChildTaskResult.Type = ChildTaskResult.self,
    isolation: isolated (any Actor)? = #isolation,
    from array: [ChildTaskInput],
    maxConcurrent: Int,
    operation: sending @escaping @isolated(any) (ChildTaskInput) async -> ChildTaskResult
) async -> [ChildTaskInput: ChildTaskResult] where ChildTaskInput: Hashable, ChildTaskResult: Sendable {
    await withTaskGroup(of: (ChildTaskInput, ChildTaskResult).self) { group in
        precondition(maxConcurrent > 0)

        var results: [ChildTaskInput: ChildTaskResult] = [:]

        for (index, input) in array.enumerated() {
            if index >= maxConcurrent, let result = await group.next() {
                results[result.0] = result.1
            }
            group.addTask { await (input, operation(input)) }
        }

        for await (id, result) in group {
            results[id] = result
        }

        return results
    }
}

There are a bunch of ways of doing this, but hopefully this illustrates a few patterns for abstracting the “maximum concurrent tasks” logic out of our application code. Likewise, the above could be optimized/refined further, but I was trying to keep it simple for the sake of legibility.

1 Like

Heh, I copied the snippet from our WWDC session "snippets" seems we have an incomplete snippet there. Yeah as you gather a task you should be adding another task to the group.

I don't think anyone is arguing that this pattern is verbose and needs something better, so there's no need to beat on it much more, let's instead focus on proposed solutions and improvements.

4 Likes

After some mild thinking about this, my initial conclusion is that we should add parallelMap and parallelForEach to the async-algorithms package (names are just placeholders, whatever feels most idiomatic).

Adding directly to stdlib would be cool of course, but so far we don't even have "proper" back-pressured stream types - so I wouldn't expect seeing the proposed features anytime soon.

// if you care about results (will require unstructured task inside and back-pressured output stream)
let outputStream = myLargeInputStream
  .parallelMap(concurrency: 5) {
    await doTheThingWithResult($0) 
  }

for await result in outputStream {
  print(result)
}
// or collect with `let results = await Array(outputStream)`
// if you don't care about results
// similar to a discarding task group, for a "sink-only" feature (no buffering or unstructured task required)
await myLargeInputStream.parallelForEach(concurrency: 5) {
  await doThThing($0)
}

The only somewhat tricky bit is that the parallelMap function needs to have an unstructured task robustly hooked up with an internal task group and a buffering output stream (we can hopefully use one of @FranzBusch `s proposed back-pressuring stream types). But even that seems pretty straight-forward to do.

Just my two cents, but I'd like to reiterate that I'd much more prefer this problem being solved in a task group agnostic way. In my experience solving the "limited parallelism"-problem you often need the primitive to be escapable (which a task group can/will probably never be) so it can be used with unstructured/unrelated tasks.

However, this could later be expanded to be combined with a task group, if this is a common use case (code from similar thread):

struct Limiter {
  init(limit: Int) // either have explicit Ints here or maybe also pool size relatives like others have suggested
  func withControl<R>(operation: () async throws -> R) async throws -> R { ... }
}

... and then maybe:

await with[Throwing|Discarding]TaskGroup(limiter: limiter) { taskGroup in
  for something in array {
    taskGroup.addTask { ... }
  }
}

Unfortunately I don't think this approach solves the eager vs. lazy consumption of array or async sequences yet, but maybe Limiter could be used for this as well in combination with the suggested parallelMap/parallelForEach, haven't thought about it much, though.

I would reiterate also that a key design issue here is really to ensure that "some" cores are left to interactive/low latency requirements too - the other thing is to rate-limit so we don't kill a server resource that can't handle too many requests. It would be great to find an api surface that covers both issues and allows to express both things as they seem fundamentally related and intertwined in a more complex application.

3 Likes

Thinking a bit about it, I wouldn't mind a global knob for reserving global concurrent threads in fact, rather than doing it per task group or similar.

For the kind of applications where you want to perform heavy work concurrently, but keep a couple of threads of the concurrent thread pool to handle ad-hoc interactive stuff, it would in fact be convenient to be able to control this on a global basis without having to tweak every call site (including those that might exist in third party packages)...

That would be extremely helpful - we are in an environment where the minimum core count for desktops are 16, we can have significant heavy parallelism going on in the background with many task groups in various subsystems running, but would like to keep say 2 threads from the concurrent thread pool "free" at all times for user responsiveness for certain operations.

Currently that is not super straightforward. Being able to tweak such a global knob would just fundamentally address most of the problem.

4 Likes

I personally fixed this problem a few years ago and later decided to open-source it as drain-work-pool. I’m using it in my CLI tools for image processing, download, and upload tasks, so I can still do other things while the CLI works with a limited number of parallel tasks.