- Authors: Cory Benfield, Konrad Malawski
- Status: Pitch
- Implementation: https://github.com/apple/swift/pull/62271
Introduction
Task groups are the building block of structured concurrency, allowing for the Swift runtime to relate groups of tasks together. This enables powerful features such as automatic cancellation propagation, correctly propagating errors, and ensuring well-defined lifetimes, as well as providing diagnostic information to programming tools.
The version of Task Groups introduced in SE-0304 provides all of these features. However, it also provides the ability to propagate return values to the user of the task group. This capability provides an unexpected limitation in some use-cases.
As users of Task Groups are able to retrieve the return values of child tasks, it implicitly follows that the Task Group preserves at least the Result
of any completed child task. As a practical matter, the task group actually preseves the entire Task
object. This data is preserved until the user consumes it via one of the Task Group consumption APIs, whether that is next()
or by iterating the Task Group.
The result of this is that Task Groups are ill-suited to running for a potentially unbounded amount of time. An example of such a use-case is managing connections accepted from a listening socket. A simplified example of such a workload might be:
try await withThrowingTaskGroup(of: Void.self) { group in
while let newConnection = try await listeningSocket.accept() {
group.addTask {
handleConnection(newConnection)
}
}
}
As written, this task group will leak all the child Task
objects until the listening socket either terminates or throws. If this was written for a long-running server, it is entirely possible for this Task Group to survive for a period of days, leaking thousands of Task objects. For stable servers, this will eventually drive the process into memory exhaustion, forcing it to be killed by the OS.
The current implementation of Task Groups do not provide a practical way to avoid this issue. Task Groups are (correctly) not Sendable
, so neither the consumption of completed Task
results nor the submission of new work can be moved to a separate Task
.
The most natural attempt to avoid this unbounded memory consumption would be to attempt to occasionally purge the completed task results. An example might be:
try await withThrowingTaskGroup(of: Void.self) { group in
while let newConnection = try await listeningSocket.accept() {
group.addTask {
handleConnection(newConnection)
}
try await group.next()
}
}
Unfortunately, all of the methods for attempting to pop the queue of completed Task
s will suspend if all currently live child Task
s are executing. This means that the above pattern (or any similar pattern) is at risk of occasional livelocks, where pending connections could be accepted, but the Task
is blocked waiting for existing work to complete.
There is only one design pattern to avoid this issue, which involves forcibly bounding the maximum concurrency of the Task Group. This pattern looks something like the below:
try await withThrowingTaskGroup(of: Void.self) { group in
// Fill the task group up to maxConcurrency
for _ in 0..<maxConcurrency {
guard let newConnection = try await listeningSocket.accept() else {
break
}
group.addTask { handleConnection(newConnection) }
}
// Now follow a one-in-one-out pattern
while true {
_ = try await group.next()
guard let newConnection = try await listeningSocket.accept() else {
break
}
group.addTask { handleConnection(newConnection) }
}
}
While this is workable, it forces users to determine a value for maxConcurrency
. This is frequently very hard to decide a priori. Practically users tend to take a wild guess, and either get a value far too large (causing memory to be wasted) or far too low (causing the system to be underutilized). While there is value in developing a strategy for bounding the maximum concurrency of a TaskGroup
, that problem is sufficiently complex to be worth its own separate discussion.
Proposed Solution
We propose adding two new types, called TaskPool
and ThrowingTaskPool
. TaskPool
s are very similar to TaskGroup
s, but differ in the following ways:
- Child tasks can only have
Void
return type. -
[Throwing]TaskPool
s have nonext()
method and are notAsyncSequence
s. -
[Throwing]TaskPool
s automatically clean up their childTask
s when thoseTask
s complete.
They maintain many of the same behaviours as TaskGroup
, albeit in some cases with slightly different manifestations:
-
ThrowingTaskPool
s are automatically cancelled when one of their childTask
s terminates with a thrown error. -
ThrowingTaskPool
s that are cancelled in this way will, after awaiting all their childTask
s, throw the error that originally caused them to be auto-cancelled. - Automatic cancellation propagation works as usual, so cancelling the
Task
that owns a[Throwing]TaskPool
automatically cancels all childTask
s.
While it is conceptually possible to implement all of this as an optional behaviour on TaskGroup
(see "Alternatives Considered" for more details), there is one valuable reason to define a new type: as a result of the missing next()
method, TaskPool
can be Sendable
.
Sendable Task Pool
Conceptually a TaskPool
hides a multi-producer queue of completed Task
s. Whenever a child Task
completes, either successfully or with an error, it adds itself to the queue of completed Task
s that is owned by the parent Task
.
To manage the complexity of this queue, it was decided to make TaskGroup
non-Sendable
. This means that the next()
method on TaskGroup
can only be called from a single Task
. The result of this change is that the queue in question only needs to be a multi-producer-single-consumer queue. Allowing the next()
method to be called from multiple Task
s requires the queue to be multi-producer-multi-consumer, a substantially harder queue type to build in a performant fashion.
Because TaskPool
does not have a next()
method, this problem does not exist. Conceptually, there are no consumers for a queue at all. We still need to cancel the parent Task
safely when the first child Task
completes, but that's a substantially more constrained problem that is far easier to write a performant solution for. As a result, there is no longer any need to prevent other Task
s having access to the TaskPool
.
This leads to two new capabilities. First, we can safely make [Throwing]TaskPool
Sendable
. This enables us to pass the TaskPool
into child Task
s, and so enables those child Task
s to themselves add further child Task
s. That is, we can now enable the following pattern:
try await withTaskPool { pool in
pool.addTask {
let result = try await doSomeWork()
if let followOn = result.followOnOperation {
pool.addTask {
handleFollowOn(followOn)
}
}
}
}
This capability allows TaskPool
s to encapsulate chains of work that may run in parallel. While this removes some of the benefits of structured concurrency, the extra flexibility is very valuable. This capability is present in other structured concurrency implementations in other languages (for example, Trio's Nursery
). Having it available in Swift Concurrency enables more complex task trees to be built without relying on AsyncSequence
to provide the communication substrate.
API Surface
public func withTaskPool<PoolResult>(
returning returnType: PoolResult.Type = PoolResult.self,
body: (inout TaskPool) async throws -> PoolResult
) async rethrows -> PoolResult
@frozen
public struct TaskPool: Sendable {
public mutating func addTask(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async -> Void
)
public mutating func addTaskUnlessCancelled(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async -> Void
) -> Bool
public mutating func waitForAll() async
public var isEmpty: Bool
public func cancelAll()
public var isCancelled: Bool
}
public func withThrowingTaskPool<PoolResult>(
returning returnType: PoolResult.Type = PoolResult.self,
body: (inout ThrowingTaskPool) async throws -> PoolResult
) async throws -> PoolResult
@frozen
public struct ThrowingTaskPool: Sendable {
public mutating func addTask(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async throws -> Void
)
public mutating func addTaskUnlessCancelled(
priority: TaskPriority? = nil,
operation: @Sendable @escaping () async throws -> Void
) -> Bool
public mutating func waitForAll() async throws
public var isEmpty: Bool
public func cancelAll()
public var isCancelled: Bool
}
Alternatives Considered
Alternative behaviour on TaskGroup
Instead of defining a new type, we could add a new boolean parameter to withTaskGroup
and withThrowingTaskGroup
, discardResults
. When set to true
, [Throwing]TaskGroup.next()
would always return nil
, and all child Task
s would be automatically cleaned up promptly.
This has the major advantage of avoiding introducing a new concept. Users who are already familiar with TaskGroup
do not need to be confronted with and understand how TaskPool
s are different. This reduction in cognitive burden is valuable, as structured concurrency can already be quite complex to learn.
There are two disadvantages of this approach. The first is that we cannot make TaskGroup
conditionally Sendable
based on a runtime-provided parameter, so we cannot gain the capability to add child Task
s from within child Task
s. The second disadvantage is that we have a strange, vestigial next()
function that will always return nil
. This is a fairly fundamental behavioural change, particularly as it affects the AsyncSequence
conformance of TaskGroup
.
For these reasons we have decided to pitch a new type, but the community should strongly consider whether the cognitive burden of the new type is justified.
Error throwing behaviour
The pitch proposes that ThrowingTaskPool
will throw only the first error thrown by a child Task
. This means that all subsequent errors will be discarded, which is an unfortunate loss of information. Two alternative behaviours could be chosen: we could not add ThrowingTaskPool
at all, or we could throw an aggregate error that contains all errors thrown by the child Task
s.
Not adding ThrowingTaskPool
is a substantial ergonomic headache. Automatic error propagation is one of the great features of structured concurrency, and not being able to use it with TaskPool
s is an unnecessary limitation, especially as it's not particularly technically challenging to propagate errors. For this reason, we do not think it's wise to omit ThrowingTaskPool
.
The other alternative is to throw an aggregate error. This would require that TaskPool
persist all (or almost all) errors thrown by child tasks and merge them together into a single error struct
that is thrown. This idea is a mixed bag.
The main advantage of throwing an aggregate error is that no information is lost. Programs can compute on all errors that were thrown, and at the very least can log or provide other metrics based on those errors. Avoiding data loss in this way is valuable, and gives programmers more flexibility.
Throwing an aggregate error has two principal disadvantages. The first is that aggregate errors do not behave gracefully in catch
statements. If a child task has thrown MyModuleError
, programmers would like to write catch MyModuleError
in order to handle it. Aggregate errors break this situation, even if only one error is thrown: programmers have to write catch let error = error as? MyAggregateError where error.baseErrors.contains(where: { $0 is MyModuleError })
, or something else equally painful.
The other main disadvantage is the storage bloat from CancellationError
. The first thrown error will auto-cancel all child Task
s. This is great, but that cancellation will likely manifest in a thrown CancellationError
, which will presumably bubble to the top and be handled by the TaskPool
. This means that a TaskPool
will likely store a substantial collection of errors, where all but the first are CancellationError
. This is a substantial regression in convenience for the mainline case, with additional costs in storage, without providing any more meaningful information.
For these reasons we've chosen the middle behaviour, where only one error is thrown. We think there is merit in throwing an aggregate error, however, and we'd like community feedback on this alternative.
Child Task for reaping
An alternative would be to have Task Group spin up a child Task
that can be used to consume tasks from the group. The API surface would look something like this:
withTaskGroupWithChildTask(of: Void.self) { group in
group.addTask {
handleConnection(newConnection)
}
}
consumer: { group in
for task in group { }
}
The advantage of this variant is that it is substantially more flexible, and allows non-Void
-returning tasks. The downside of this variant is that it muddies the water on the question of whether Task Groups are Sendable
(requiring a specific-exemption for this use-case) and forces users to understand the lifetime of a pair of different closures.