Hello everyone,
I'd like to pitch a new addition to the concurrency library: Task Priority Escalation APIs.
This feature exposes the internal mechanisms of task priority escalation and handling thereof to developers, enabling the manual the propagation of task priority escalation to un-structured tasks.
It's a rather specialized feature we fully expect very few developers to touch explicitly, however it is necessary for performance improvements of some specific patterns which do sometimes need to reach for unstructured concurrency.
The pitch is a PR over here so please suggest any typo fixes on the PR directly. For convenience, I'm pasting the complete initial text below:
Task Priority Escalation APIs
- Proposal: SE-NNNN
- Authors: Konrad 'ktoso' Malawski
- Review Manager: TBD
- Status: Awaiting implementation
- Implementation: [Concurrency] Task priority escalation APIs by ktoso · Pull Request #78625 · swiftlang/swift · GitHub
- Review: ...
Introduction
A large part of Swift Concurrency is its Structured Concurrency model, in which tasks automatically form parent-child relationships, and inherit certain traits from their parent task. For example, a task started from a medium priority task, also starts on the medium priority, and not only that – if the parent task gets awaited on from a higher priority task, the parent's as well as all of its child tasks' task priority will be escalated in order to avoid priority inversion problems.
This feature is automatic and works transparently for any structured task hierarchy. This proposal will discuss exposing user-facing APIs which can be used to participate in task priority escalation.
Motivation
Generally developers can and should rely on the automatic task priority escalation happening transparently–at least for as long as all tasks necessary to escalate are created using structured concurrency primitives (task groups and async let
). However, sometimes it is not possible to entirely avoid creating an unstructured task.
One such example is the async sequence merge
operation from the swift-async-algorithms project where the implementation is forced to create an unstructured task for iterating the upstream sequences, which must outlive downstream calls. These libraries would like to participate in task priority escalation to boost the priority of the upstream consuming task, however today lack the API to do so.
// SIMPLIFIED EXAMPLE CODE
// Complete source: https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/Merge/MergeStorage.swift
struct AsyncMergeSequenceIterator: AsyncIterator {
struct State {
var task: Task<Void, any Error>? // unstructured upstream consumer task
var buffer: Deque<Element>
var upstreamContinuations: [UnsafeContinuation<Void, Error>]
var downstreamContinuation: UnsafeContinuation<Element?, Error>?
}
let state = Mutex<State>(State())
func next() async throws {
self.state.withLock { state in
if state.task == nil {
state.task = Task {
// Consume from the base iterators
// ...
}
}
}
if let element = self.state.withLock { $0.buffer.popFirst() } {
return element
} else {
// We are handling cancellation here and need to handle task escalation here as well
try await withTaskCancellationHandler {
// HERE: need to handle priority escalation and boost `state.task`
try await withCheckedContinuation { cont in
self.state.withLock { $0.consumerContinuation = cont }
}
} onCancel: {
// trigger cancellation of tasks and fail continuations
}
}
}
}
The above example showcases a common pattern: often an unchecked continuation is paired with a Task used to complete it. Around the suspension on the continuation, waiting for it to be resumed, developers often install a task cancellation handler in order to potentially break out of potentially unbounded waiting for a continuation to be resumed. Around the same suspension (marked with HERE
in the snippet above), we might want to insert a task priority escalation handler in order to priority boost the task that is used to resume the continuation. This can be important for correctness and performance of such operations, so we should find a way to offer these libraries to participate in task priority handling.
Another example of libraries which may want to reach for manual task priority escalation APIs are libraries which facilitate communication across process boundaries, and would like to react to priority escalation and propagate it to a different process. Relying on the built-in priority escalation mechanisms won't work, because they are necessarily in-process, so libraries like this need to be able to participate and be notified when priority escalation happens, and also be able to efficiently cause the escalation inside the other process.
Proposed solution
In order to address the above use-cases, we propose to add a pair of APIs: to react to priority escalation happening within a block of code, and an API to cause a priority escalation without resorting to trickery using creating new tasks whose only purpose is to escalate the priority of some other task:
let m: Mutex<Task<Void, Never>?>
await withTaskPriorityEscalationHandler {
await withCheckedContinuation { cc in
let t = Task { cc.resume() }
m.withLock { $0 = t }
} onPriorityEscalated: { newPriority in
let t = m.withLock { $0 }
Task.escalatePriority(t, to: newPriority)
}
}
Detailed design
We propose the addition of a task priority escalation handler, similar to task cancellation handlers already present in the concurrency library:
public func withTaskPriorityEscalationHandler<T, E>(
operation: () async throws(E) -> T,
onPriorityEscalated handler: @Sendable (TaskPriority) -> Void,
isolation: isolated (any Actor)? = #isolation
) async throws(E) -> T
The shape of this API is similar to the withTaskCancellationHandler
API present since initial Swift Concurrency release, however–unlike a cancellation handler–the onPriorityEscalated
callback may be triggered multiple times. The TaskPriority
passed to the handler is the "new priority" the surrounding task was escalated to.
It is guaranteed that priority is ever only increasing, as Swift Concurrency does not allow for a task priority to ever be lowered after it has been escalated. If attempts are made to escalate the task priority from multiple other threads to the same priority, the handler will only trigger once. However if priority is escalated to a high and then even higher priority, the handler may be invoked twice.
Task escalation handlers are inherently racy, and may sometimes miss an escalation, for example if it happened immediately before the handler was installed, like this:
// priority: low
// priority: high!
await withTaskPriorityEscalationHandler {
await work()
} onPriorityEscalated: { newPriority in // may not be triggered if ->high escalation happened before handler was installed
// do something
}
This is inherent to the nature of priority escalation and even with this behavior, we believe handlers are a worthy addition. One could also check for the Task.currentPriority
and match it against our expectations inside the operation
wrapped by the withTaskPriorityEscalationHandler
if that could be useful to then perform the operation at an already immediately heightened priority.
Escalation handlers work with any existing task kind (child, unstructured, unstructured detached), and trigger at every level of the hierarchy in an "outside in" order:
let t = Task {
await withTaskPriorityEscalationHandler {
await withTaskGroup { group in
group.addTask {
await withTaskPriorityEscalationHandler {
try? await Task.sleep(for: .seconds(1))
} onPriorityEscalated: { newPriority in print("inner: \(newPriority)") }
}
}
} onPriorityEscalated: { newPriority in print("outer: \(newPriority)") }
}
// escalate t -> high
// "outer: high"
// "inner: high"
The API can also be freely composed with withTaskCancellationHandler
or there may even be multiple task escalation handlers registered on the same task (but in different pieces of the code).
Manually propagating priority escalation
While generally developers should not rely on manual task escalation handling, this API also does introduce a manual way to escalate a task's priority. Primarily this should be used in combination with a task escalation handler to propagate an escalation to an unstructured task which otherwise would miss reacting to the escalation.
The escalatePriority
API is offered as a static method on Task
in order to slightly hide it away from using it accidentally by stumbling upon it if it were directly declared as a member method of a Task.
extension Task {
public static func escalatePriority(_ task: Task, to newPriority: TaskPriority)
public static func escalatePriority(_ task: UnsafeCurrentTask, to newPriority: TaskPriority)
}
It is possible to escalate both a Task
and UnsafeCurrentTask
, however great care must be taken to not attempt to escalate an unsafe task handle if the task has already been destroyed. The Task
accepting API is always safe.
Currently it is not possible to escalate a specific child task (created by async let
or a task group) because those do not return task handles. We are interested in exposing task handles to child tasks in the future, and this design could then be easily amended to gain API to support such child task handles as well.
Source compatibility
This proposal is purely additive, and does not cause any source compatibility issues.
ABI compatibility
This proposal is purely ABI additive.
Alternatives considered
New Continuation APIs
We did consider if offering a new kind of continuation might be easier to work with for developers. One shape this might take is:
struct State {
var cc = CheckedContinuation<Void, any Error>?
var task: Task<Void, any Error>?
}
let C: Mutex<State>
await withCheckedContinuation2 { cc in
// ...
C.withLock { $0.cc = cc }
let t = Task {
C.withLock {
$0.cc?.resume() // maybe we'd need to add 'tryResume'
}
}
C.withLock { $0.task = t }
} onCancel: { cc in
// remember the cc can only be resumed once; we'd need to offer 'tryResume'
cc.resume(throwing: CancellationError())
} onPriorityEscalated: { cc, newPriority in
print("new priority: \(newPriority)")
C.withLock { Task.escalatePriority($0.task, to: newPriority) }
}
While at first this looks promising, we did not really remove much of the complexity -- careful locking is still necessary, and passing the continuation into the closures only makes it more error prone than not since it has become easier to accidentally multi-resume a continuation. This also does not compose well, and would only be offered around continuations, even if not all use-cases must necessarily suspend on a continuation to benefit from the priority escalation handling.
Overall, this seems like a tightly knit API that changes current idioms of with...Handler
without really saving us from the inherent complexity of these handlers being invoked concurrently, and limiting the usefulness of those handlers to just "around a continuation" which may not always be the case.