[Pitch] Task Pools

Introduction

Task groups are the building block of structured concurrency, allowing for the Swift runtime to relate groups of tasks together. This enables powerful features such as automatic cancellation propagation, correctly propagating errors, and ensuring well-defined lifetimes, as well as providing diagnostic information to programming tools.

The version of Task Groups introduced in SE-0304 provides all of these features. However, it also provides the ability to propagate return values to the user of the task group. This capability provides an unexpected limitation in some use-cases.

As users of Task Groups are able to retrieve the return values of child tasks, it implicitly follows that the Task Group preserves at least the Result of any completed child task. As a practical matter, the task group actually preseves the entire Task object. This data is preserved until the user consumes it via one of the Task Group consumption APIs, whether that is next() or by iterating the Task Group.

The result of this is that Task Groups are ill-suited to running for a potentially unbounded amount of time. An example of such a use-case is managing connections accepted from a listening socket. A simplified example of such a workload might be:

try await withThrowingTaskGroup(of: Void.self) { group in
    while let newConnection = try await listeningSocket.accept() {
        group.addTask {
            handleConnection(newConnection)
        }
    }
}

As written, this task group will leak all the child Task objects until the listening socket either terminates or throws. If this was written for a long-running server, it is entirely possible for this Task Group to survive for a period of days, leaking thousands of Task objects. For stable servers, this will eventually drive the process into memory exhaustion, forcing it to be killed by the OS.

The current implementation of Task Groups do not provide a practical way to avoid this issue. Task Groups are (correctly) not Sendable, so neither the consumption of completed Task results nor the submission of new work can be moved to a separate Task.

The most natural attempt to avoid this unbounded memory consumption would be to attempt to occasionally purge the completed task results. An example might be:

try await withThrowingTaskGroup(of: Void.self) { group in
    while let newConnection = try await listeningSocket.accept() {
        group.addTask {
            handleConnection(newConnection)
        }
        try await group.next()
    }
}

Unfortunately, all of the methods for attempting to pop the queue of completed Tasks will suspend if all currently live child Tasks are executing. This means that the above pattern (or any similar pattern) is at risk of occasional livelocks, where pending connections could be accepted, but the Task is blocked waiting for existing work to complete.

There is only one design pattern to avoid this issue, which involves forcibly bounding the maximum concurrency of the Task Group. This pattern looks something like the below:

try await withThrowingTaskGroup(of: Void.self) { group in
    // Fill the task group up to maxConcurrency
    for _ in 0..<maxConcurrency {
	    guard let newConnection = try await listeningSocket.accept() else {
	        break
	    }
	    
	    group.addTask { handleConnection(newConnection) }
	}
	
	// Now follow a one-in-one-out pattern
	while true {
	    _ = try await group.next()
	    guard let newConnection = try await listeningSocket.accept() else {
	        break
	    }
	    group.addTask { handleConnection(newConnection) }
	}
}

While this is workable, it forces users to determine a value for maxConcurrency. This is frequently very hard to decide a priori. Practically users tend to take a wild guess, and either get a value far too large (causing memory to be wasted) or far too low (causing the system to be underutilized). While there is value in developing a strategy for bounding the maximum concurrency of a TaskGroup, that problem is sufficiently complex to be worth its own separate discussion.

Proposed Solution

We propose adding two new types, called TaskPool and ThrowingTaskPool. TaskPools are very similar to TaskGroups, but differ in the following ways:

  1. Child tasks can only have Void return type.
  2. [Throwing]TaskPools have no next() method and are not AsyncSequences.
  3. [Throwing]TaskPools automatically clean up their child Tasks when those Tasks complete.

They maintain many of the same behaviours as TaskGroup, albeit in some cases with slightly different manifestations:

  1. ThrowingTaskPools are automatically cancelled when one of their child Tasks terminates with a thrown error.
  2. ThrowingTaskPool s that are cancelled in this way will, after awaiting all their child Tasks, throw the error that originally caused them to be auto-cancelled.
  3. Automatic cancellation propagation works as usual, so cancelling the Task that owns a [Throwing]TaskPool automatically cancels all child Tasks.

While it is conceptually possible to implement all of this as an optional behaviour on TaskGroup (see "Alternatives Considered" for more details), there is one valuable reason to define a new type: as a result of the missing next() method, TaskPool can be Sendable.

Sendable Task Pool

Conceptually a TaskPool hides a multi-producer queue of completed Tasks. Whenever a child Task completes, either successfully or with an error, it adds itself to the queue of completed Tasks that is owned by the parent Task.

To manage the complexity of this queue, it was decided to make TaskGroup non-Sendable. This means that the next() method on TaskGroup can only be called from a single Task. The result of this change is that the queue in question only needs to be a multi-producer-single-consumer queue. Allowing the next() method to be called from multiple Tasks requires the queue to be multi-producer-multi-consumer, a substantially harder queue type to build in a performant fashion.

Because TaskPool does not have a next() method, this problem does not exist. Conceptually, there are no consumers for a queue at all. We still need to cancel the parent Task safely when the first child Task completes, but that's a substantially more constrained problem that is far easier to write a performant solution for. As a result, there is no longer any need to prevent other Tasks having access to the TaskPool.

This leads to two new capabilities. First, we can safely make [Throwing]TaskPool Sendable. This enables us to pass the TaskPool into child Tasks, and so enables those child Tasks to themselves add further child Tasks. That is, we can now enable the following pattern:

try await withTaskPool { pool in
    pool.addTask {
        let result = try await doSomeWork()
        if let followOn = result.followOnOperation {
            pool.addTask {
                handleFollowOn(followOn)
            }
        }
    }
}

This capability allows TaskPools to encapsulate chains of work that may run in parallel. While this removes some of the benefits of structured concurrency, the extra flexibility is very valuable. This capability is present in other structured concurrency implementations in other languages (for example, Trio's Nursery). Having it available in Swift Concurrency enables more complex task trees to be built without relying on AsyncSequence to provide the communication substrate.

API Surface

public func withTaskPool<PoolResult>(
  returning returnType: PoolResult.Type = PoolResult.self,
  body: (inout TaskPool) async throws -> PoolResult
) async rethrows -> PoolResult

@frozen
public struct TaskPool: Sendable {
  public mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> Void
  )

  public mutating func addTaskUnlessCancelled(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> Void
  ) -> Bool

  public mutating func waitForAll() async

  public var isEmpty: Bool

  public func cancelAll()

  public var isCancelled: Bool
}

public func withThrowingTaskPool<PoolResult>(
  returning returnType: PoolResult.Type = PoolResult.self,
  body: (inout ThrowingTaskPool) async throws -> PoolResult
) async throws -> PoolResult

@frozen
public struct ThrowingTaskPool: Sendable {
  public mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> Void
  )

  public mutating func addTaskUnlessCancelled(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> Void
  ) -> Bool

  public mutating func waitForAll() async throws

  public var isEmpty: Bool

  public func cancelAll()

  public var isCancelled: Bool
}

Alternatives Considered

Alternative behaviour on TaskGroup

Instead of defining a new type, we could add a new boolean parameter to withTaskGroup and withThrowingTaskGroup, discardResults. When set to true, [Throwing]TaskGroup.next() would always return nil, and all child Tasks would be automatically cleaned up promptly.

This has the major advantage of avoiding introducing a new concept. Users who are already familiar with TaskGroup do not need to be confronted with and understand how TaskPools are different. This reduction in cognitive burden is valuable, as structured concurrency can already be quite complex to learn.

There are two disadvantages of this approach. The first is that we cannot make TaskGroup conditionally Sendable based on a runtime-provided parameter, so we cannot gain the capability to add child Tasks from within child Tasks. The second disadvantage is that we have a strange, vestigial next() function that will always return nil. This is a fairly fundamental behavioural change, particularly as it affects the AsyncSequence conformance of TaskGroup.

For these reasons we have decided to pitch a new type, but the community should strongly consider whether the cognitive burden of the new type is justified.

Error throwing behaviour

The pitch proposes that ThrowingTaskPool will throw only the first error thrown by a child Task. This means that all subsequent errors will be discarded, which is an unfortunate loss of information. Two alternative behaviours could be chosen: we could not add ThrowingTaskPool at all, or we could throw an aggregate error that contains all errors thrown by the child Tasks.

Not adding ThrowingTaskPool is a substantial ergonomic headache. Automatic error propagation is one of the great features of structured concurrency, and not being able to use it with TaskPools is an unnecessary limitation, especially as it's not particularly technically challenging to propagate errors. For this reason, we do not think it's wise to omit ThrowingTaskPool.

The other alternative is to throw an aggregate error. This would require that TaskPool persist all (or almost all) errors thrown by child tasks and merge them together into a single error struct that is thrown. This idea is a mixed bag.

The main advantage of throwing an aggregate error is that no information is lost. Programs can compute on all errors that were thrown, and at the very least can log or provide other metrics based on those errors. Avoiding data loss in this way is valuable, and gives programmers more flexibility.

Throwing an aggregate error has two principal disadvantages. The first is that aggregate errors do not behave gracefully in catch statements. If a child task has thrown MyModuleError, programmers would like to write catch MyModuleError in order to handle it. Aggregate errors break this situation, even if only one error is thrown: programmers have to write catch let error = error as? MyAggregateError where error.baseErrors.contains(where: { $0 is MyModuleError }), or something else equally painful.

The other main disadvantage is the storage bloat from CancellationError. The first thrown error will auto-cancel all child Tasks. This is great, but that cancellation will likely manifest in a thrown CancellationError, which will presumably bubble to the top and be handled by the TaskPool. This means that a TaskPool will likely store a substantial collection of errors, where all but the first are CancellationError. This is a substantial regression in convenience for the mainline case, with additional costs in storage, without providing any more meaningful information.

For these reasons we've chosen the middle behaviour, where only one error is thrown. We think there is merit in throwing an aggregate error, however, and we'd like community feedback on this alternative.

Child Task for reaping

An alternative would be to have Task Group spin up a child Task that can be used to consume tasks from the group. The API surface would look something like this:

withTaskGroupWithChildTask(of: Void.self) { group in
    group.addTask {
        handleConnection(newConnection)
    }
}
consumer: { group in
    for task in group { }
}

The advantage of this variant is that it is substantially more flexible, and allows non-Void-returning tasks. The downside of this variant is that it muddies the water on the question of whether Task Groups are Sendable (requiring a specific-exemption for this use-case) and forces users to understand the lifetime of a pair of different closures.

28 Likes

Wouldn’t another possible solution to the motivating problem be a pollNext method which would return a result if one is immediately available without waiting?
This wouldn’t provide the sendability benefit though.

That provides partial solutions, but it delays memory reclaim until the next work item comes in on the async sequence, and it provides a pretty gnarly footgun (if you ever forget to call it in your work loop, congrats, you have a memory leak).

1 Like

Small clarification: I think this was meant to say "Conceptually a TaskGroup hides..." or "Conceptually both, TaskGroup and TaskPool hide...", right? Cause right after that sentence, you go on to explain what consequences this implies for TaskGroup due to the next() method. If I got it right the difference is that the lack of next() for TaskPool is then the benefit: That means no consumers for the queue, so it can work on a more constrained problem, as you call it.

Generally speaking this seems like a good thing to have. I think it's good that it would be its own type, as imo the burden on the developer to learn about this is not actually much bigger than for a modified TaskGroup. It's still two things, and while you'd "cognitively save" on learning TaskPool if TaskGroup got a discardResults, understanding what that parameter does eats this up again. Plus you wouldn't have a Sendable type still.

I'm a little split on the single error thing. I understand the rationale and have actually encountered this problem when writing custom GCD code in a client (doing a whole bunch of REST API calls with generic code).
At first I went with a "once one fails, give an error for that and stop" as well, but later on changed it. I'm sure I overlook something, but in that case I didn't even want to auto-cancel all other tasks.

To perhaps elaborate a bit:
I had multiple different paginated APIs (that were similar enough to enable me to treat them all with generic code for parsing, etc.). From all of them I would want to download in parallel. The ability to recursively add new tasks for next pages from any API (i.e. the TaskPool being Sendable) seems great for this. However, if one of these tasks throws an error (e.g. while parsing) I wouldn't want to stop all other tasks necessarily, each one should get a chance to "go as far as it can".

I could imagine simply not throwing in case of a parsing error (i.e. from the TaskPool's perspective encountering a bad response of a single API stops new tasks for that particular API being added), but that seems counter-intuitive.

All of this is hypothetical, in my case I solved this differently in the end, I just thought this could be a valid example for a use-case. Even with the single error I would still consider this a very worthwhile API.

1 Like

In general, I am a big +1 on solving this problem space. I have come across this problems on various levels of the stack. From bootstrapping server side applications to implementing streaming RPC methods. I have some remarks and open questions.

Introducing a new type: I agree that we shouldn’t overload the current task group to also provide this kind of behavior. It would make the implementation more complex and also increase the mental overhead when reading code.
I also like the name TaskPool; however, I was thinking if we could also provide something similar to async let for this use-case. I haven’t fully thought this through (still out on vacation :sweat_smile:).

Sendable: Still a bit torn on this one. Wouldn’t it cause issues if you send the pool to an unstructured task? Couldn’t you then in theory add child tasks to an already finished pool? Since we don’t have the concept of only sendable to structured child tasks I am a bit hesitant to adopt this. Maybe we could provide the pool as a parameter in the addTask body closure.

Error handling: IMO only throwing the first error is fine. This is similar to how next() followed by cancelAll() works. If we do the cancelAll() implicitly here then I expect most of the other child tasks to throw a CancellationError anyhow which provides not a lot of benefits.

One last random thought: What if the withTaskPool methods would accept two closures. One where you get a parameter with which you can add child tasks and cancel and another closure where you can consume the results and cancel. This way we could make the individual child tasks even return things and we could handle each error individually. I haven’t thought this through from an implementation point of view and would rely on @ktoso here. But basically splitting the spawning and consuming side into two distinct closures.

Overall, very happy to see movement here. We desperately need a solution for this!

1 Like

One other type of Task relationship that is difficult/impossible to express with structured concurrency, is one where a Task may initiate a child-Task to be potentially shared amongst sibling Tasks. Maybe this is easier to imagine as an inverse Task tree.

Currently, as it needs to be shared, the 'child' Task must actually be detached. Then, as it's detached its priority needs to be decided upfront, whereas ideally it would probably adopt the priority of its highest priority parent.

An example of this relationship would be a multicast asynchronous sequence, where multiple consumers are dependent on one producer.

I'm not sure that a TaskPool would be able to accommodate this model, but if we're thinking about these relationships, I thought I'd mention it as something to perhaps consider in the wider picture.

I think this is a slightly different problem that we can’t solve with pools. The problem as you pointed out is that we have multiple tasks that want to consume from one producing task and we cannot model this right now.

This is similar to the discussion I brought up about the usage of unstructured concurrency in async algos and shows another case where we hit the current limits of both structured concurrency and AsyncSequences. Maybe it is worth adding this over to the other post and something we should also explore if we look into generators.

1 Like

Having a way to opt out of being able to next() makes sense to me, and using a new type to make that distinction is appealing for the reasons you laid out. However, it seems to me that TaskGroup itself ought to also have the same capability for child tasks to add to the group; to me, that seems like an oversight that it doesn't, since it doesn't really affect the "structured"-ness of the task tree, and as you noted, Trio and other libraries allow this.

3 Likes

Is auto-cancellation of the entire pool actually a desired behavior? Why should an error from one connection tear down all of the server’s other connections? How would I implement a server that doesn’t have this behavior while still using native Swift error handling?

What if addTask() took an error handler? Or, actually, just a completion handler that could inspect whether the task succeeded or failed? This would extend the lifetime of the task to the completion handler, but it avoids the leaking problem because the handler can’t not execute. This also eliminates the need for the distinct ThrowingTaskPool type:

public struct TaskPool: Sendable {
  public mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> Void,
    completionHandler: (@Sendable @concurrent (Task) -> Void)? = nil
  )

I haven’t wrapped my head around the consequences of combining this with withTaskCancellationHandler().

If auto-cancellation is desired, perhaps it can be achieved by throwing from completionHandler.

Was literally just thinking about a similar issue when I went to check on the forums this morning. Big +1 from me overall!

I'm a little confused on how sendability will work with mutating methods like addTask. For example, in this case:

try await withTaskPool { pool in
    pool.addTask {
        let result = try await doSomeWork()
        if let followOn = result.followOnOperation {
            pool.addTask {
                handleFollowOn(followOn)
            }
        }
    }
}

How would this compile if pool is a mutable inout variable? Unless I'm missing something, it seems to me that the only way it could work is if it were an immutable type with non-mutating methods, similar to the various Continuation types in the standard library.

1 Like

I don't love the name TaskPool - given prior art with thread and connection pools, a task pool feels to me like it would contain some bounded number of Tasks or that Tasks will be reused after they're complete, which isn't what's being proposed. It also feels like the mental model for this interacting with this type is extremely similar to the existing TaskGroup, so introducing an entire new name for something that isn't fundamentally distinct behavior could lead to confusion.

However, I don't have any great alternatives off the top of my head, so pool is probably fine if no better alternatives can be brainstormed. The only thing I can think of is something like ReapingTaskGroup? That name communicates that it's essentially a refinement of the existing TaskGroup type, but unfortunately makes ThrowingReapingTaskGroup a little awkward if that throwing variant is ultimately added.

3 Likes

That's a fair concern yes. As usual, the lifetime of the pool/group is bounded by the with{} so it cannot outlive it... Indeed perhaps passing the "thing that allows you to add tasks" directly into addTask { and make sure it does not escape could be a solution for this.

We could view this as an exercise of splitting up the "adding" and "consuming" sides of a task group. And the proposed API is just refusing to give you the "consuming" API. Perhaps we could model this as some two protocols, and TaskGroup actually conforms to both, but a pool not hm...

2 Likes

The primary blocker AFAIR was only the fact it would end up forcing us to make group Sendable, which we do not want to do (for the MPMC reasons mentioned).

However... perhaps we can take the same idea from the pool and say that the "writing side" is offered to addTask, in both a pool and a group, like this?

(group or pool).addTask { gp in 
    gp.addTask { ... }
}

that looks quite primising... We'd then also want to enforce gp cannot escape (e.g. to an unstructured task), and we'd have a safe version APIs allowing to add tasks from child tasks - definitely worth trying out :thinking:

I like this approach and I agree it is worth trying out!

Does this actually solve the problem that we are trying to solve with this proposal? We could then iterate the sequence in a child task already and spawn new child tasks while at the same time consuming the results, e.g.:

withTaskGroup(of: Void.self) { group in
    group.addTask { group in
        for await element in sequence {
            group.addTask {}
        }
    }

    for await _ in group.next() {}
}

Haven’t fully thought this through just something that occurred to me

As I was wondering the same above, my two cents on this: I don't think adding more completion handlers for this would be ideal. After all, structured concurrency is supposed to help us reduce the completion handler hell we often had to endure when doing this in the past.

I'd like it if ThrowingTaskPool (however it may be named) were configurable in that regard somehow. Perhaps this could be done by passing a simple boolean to withTaskPool and if that's true a thrown error is not the error originally thrown by a Task, but instead some kind of PoolError.multipleErrors([any Error]) case?
I could also envision a way where withTaskPool does not throw at all if it is passed such a collection type by the caller, bit that would probably look weird (as with its signature it still would require a try, I guess).

Just an "error handler" (especially per addTask) is not going to be helpful here. An "error handler" in structured concurrency is the same as in normal structured code: do {} catch {} and we will not be inventing other ways just to "catch" an error.

We could however consider giving this group an error reducer, which would allow making the "what error gets thrown" something the end user has some say in. E.g. it could drop CancellationError as "not interesting" but combine all others in some aggregate error. We would not want to be doing this on behalf of the user though. I should say though that the usefulness of this is rather low in practice... and that it could be added in a future revision as well. Thanks to @John_McCall for the idea provided off-thread.

Technically, we could add an withThrowingTaskPool(of: ..., reduceError: (previousError: Error, latestError: Error) -> Error) and we could implement this efficiently if we had to. This could also be a future addition if we're not sure about it still.

We're still thinking if we'll end up with a new type or can wrangle the TaskGroup into enough flexibility to support these desired semantics... I'll provide an update once we had some time to implement the various approaches :thinking:

1 Like

This is reasonable, but I don’t know how to do this without making TaskGroup Sendable or splitting it into two types.

Yes, very much. Auto-cancellation affects unhandled errors only. If you don’t want your server taken down, handle the error. But if you can’t, the only logical pattern is to exit.

We can already spell this with do…catch.

3 Likes

This was mentioned in this thread in passing, but to make it more explicit what I think may be a possible solution:

withTaskGroup { group in // add + consume
  group.addTask { allowAddingTasksToGroup in  // add only
    allowAddingTasksToGroup.addTask { ... } 
  }
}

We could have two protocols for the "write" and "read" sides, and separate the addTask out into the prior. This seems like it could work, I'll look into this some more.

2 Likes

This would perfectly match what I had in mind for that use case. And since it is not the most pressing thing (there are other patterns you can build on top of the proposal as it already is) I would also be perfectly fine to see this in a future addition/revision

2 Likes

I don‘t think it’s correct to say it “affects unhandled errors only.” Auto-cancellation affects errors that are not handled within the Task. I think it’s perfectly reasonable to want to handle errors centrally, outside the individual tasks, and for the client to decide whether the failure of an individual task should cause the failure of all other tasks:

/*
 * simple-rsync.swift
 */

enum Command {
  case copy(from: Path, to: Path)
  case rename(from: Path, to: Path)
  case delete(at: Path)
}

struct PermissionsError : Error {
  let path: Path
}

func processCommandList(_ commands: [Command]) async {
    await withThrowingTaskPool { pool in
      for command in commands {
        // determine the operation to perform outside the task so we don’t have to copy the Command into the task
        switch command {
          case let .copy(from: source, to: dest): pool.addTask { try _copy(from: source, to: dest) }
          case let .rename(from: source, to: dest): pool.addTask { try _rename(from: source, to: dest) }
          case let .delete(at: path): pool.addTask { try _delete(at: path) }
        }
      }
    } taskCompletionHandler: { task /* : Task<Void, PermissionsError> */ in
      if case let .failure(error) = task.result {
        // like rsync(1), log an error for this operation, but continue with the others
        print("Insufficient permissions to access \(error.path)")
      }
    }
  }
}

To my eye, the biggest flaw with the current proposal is that a developer can write a task that doesn’t handle its own errors, and the compiler says nothing. The consequences manifest only at runtime, in the form of cancelling other tasks. “My sever is resetting all of its connections, but my logs only show extremely infrequent errors” sounds like a very confusing SEV to debug!

Edit: Per @ktoso’s comment about Task reducers, I think I can phrase my rsync example in those terms:

func processCommandList(_ commands: [Command]) async {
    do {
      try await withThrowingTaskPool { pool in
        for command in commands {
          // determine the operation to perform outside the task so we don’t have to copy the Command into the task
          switch command {
            case let .copy(from: source, to: dest): pool.addTask { try _copy(from: source, to: dest) }
            case let .rename(from: source, to: dest): pool.addTask { try _rename(from: source, to: dest) }
            case let .delete(at: path): pool.addTask { try _delete(at: path) }
          }
        }
      } reducingResults: { result /* : Result<Void, any Error> */ in
          guard let PermissionsError.failure(error) = task.result else {
            // bubble unknown errors up to outer do/catch
            return result
          }
          // like rsync(1), log an error for failing operations, but continue with the others
          print("Insufficient permissions to access \(error.path)")
          return nil // don’t bubble this error up
      }
    }
  } catch {
    // something other than a PermissionsError happened
    fatalError("Unhandled error: \(String(describing: error))")
  }
}