[Pitch] Limited Parallelism TaskGroup

@ktoso Please correct me if I’m wrong or misunderstood your post, but I think you may have omitted a key portion of the pattern. The above snippet will only chop the first three ingredients. But we want to chop all the ingredients, just not more than three at a time. So the pattern is something along the lines of:

func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
    await withTaskGroup(
        of: (ChoppedIngredient?).self,
        returning: [any ChoppedIngredient].self) 
    { group in
        // Concurrently chop ingredients
        let maxChopTasks = min(3, ingredients.count)
        
        // run first three concurrently
        for ingredientIndex in 0..<maxChopTasks {
            group.addTask { await chop(ingredients[ingredientIndex]) }
        }

        var choppedIngredients: [any ChoppedIngredient] = []

        // run the rest (if any), accumulating previous task results as we go
        for ingredientIndex in maxChopTasks..<ingredients.count {
            if let taskResult = await group.next(), let choppedIngredient = taskResult {
                choppedIngredients.append(choppedIngredient)
            }
            group.addTask { await chop(ingredients[ingredientIndex]) }
        }

        // accumulate the last tasks
        for await choppedIngredient in group {
            if let choppedIngredient {
                choppedIngredients.append(choppedIngredient)
            }
        }
        return choppedIngredients
    }
}

An alternative pattern is:


func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
    await withTaskGroup(
        of: (ChoppedIngredient?).self,
        returning: [any ChoppedIngredient].self) 
    { group in
        var choppedIngredients: [any ChoppedIngredient] = []

        // create tasks, running no more than 3 at a time, accumulating as we go
        for (index, ingredient) in ingredients.enumerated() {
            if index >= 3, let taskResult = await group.next(), let choppedIngredient = taskResult {
                choppedIngredients.append(choppedIngredient)
            }
            
            group.addTask { await chop(ingredient) }
        }
        
        // accumulate the last tasks
        for await choppedIngredient in group {
            if let choppedIngredient {
                choppedIngredients.append(choppedIngredient)
            }
        }
        return choppedIngredients
    }
}

Some may disagree, but I find both of these unsightly. And because we have to use this pattern all over the place, it’s subject to implementation mistakes. It also entangles the business logic of “chop a bunch of ingredients” with implementation details of how we’ll constrain it to a max of three at a time.

2 Likes