[Pitch] Task Priority Escalation APIs

Hello everyone,
I'd like to pitch a new addition to the concurrency library: Task Priority Escalation APIs.

This feature exposes the internal mechanisms of task priority escalation and handling thereof to developers, enabling the manual the propagation of task priority escalation to un-structured tasks.

It's a rather specialized feature we fully expect very few developers to touch explicitly, however it is necessary for performance improvements of some specific patterns which do sometimes need to reach for unstructured concurrency.

The pitch is a PR over here so please suggest any typo fixes on the PR directly. For convenience, I'm pasting the complete initial text below:


Task Priority Escalation APIs

Introduction

A large part of Swift Concurrency is its Structured Concurrency model, in which tasks automatically form parent-child relationships, and inherit certain traits from their parent task. For example, a task started from a medium priority task, also starts on the medium priority, and not only that – if the parent task gets awaited on from a higher priority task, the parent's as well as all of its child tasks' task priority will be escalated in order to avoid priority inversion problems.

This feature is automatic and works transparently for any structured task hierarchy. This proposal will discuss exposing user-facing APIs which can be used to participate in task priority escalation.

Motivation

Generally developers can and should rely on the automatic task priority escalation happening transparently–at least for as long as all tasks necessary to escalate are created using structured concurrency primitives (task groups and async let). However, sometimes it is not possible to entirely avoid creating an unstructured task.

One such example is the async sequence merge operation from the swift-async-algorithms project where the implementation is forced to create an unstructured task for iterating the upstream sequences, which must outlive downstream calls. These libraries would like to participate in task priority escalation to boost the priority of the upstream consuming task, however today lack the API to do so.

// SIMPLIFIED EXAMPLE CODE
// Complete source: https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/Merge/MergeStorage.swift

struct AsyncMergeSequenceIterator: AsyncIterator {
  struct State {
    var task: Task<Void, any Error>? // unstructured upstream consumer task
    var buffer: Deque<Element>
    var upstreamContinuations: [UnsafeContinuation<Void, Error>]
    var downstreamContinuation: UnsafeContinuation<Element?, Error>?
  }
  
  let state = Mutex<State>(State())
  
  func next() async throws {
    self.state.withLock { state in
      if state.task == nil {
        state.task = Task {
          // Consume from the base iterators
          // ... 
        }
      }
    }

    if let element = self.state.withLock { $0.buffer.popFirst() } {
      return element
    } else {
      // We are handling cancellation here and need to handle task escalation here as well
      try await withTaskCancellationHandler { 
        // HERE: need to handle priority escalation and boost `state.task`
        try await withCheckedContinuation { cont in
          self.state.withLock { $0.consumerContinuation = cont }
        }
      } onCancel: {
        // trigger cancellation of tasks and fail continuations
      }
    }
  }
}

The above example showcases a common pattern: often an unchecked continuation is paired with a Task used to complete it. Around the suspension on the continuation, waiting for it to be resumed, developers often install a task cancellation handler in order to potentially break out of potentially unbounded waiting for a continuation to be resumed. Around the same suspension (marked with HERE in the snippet above), we might want to insert a task priority escalation handler in order to priority boost the task that is used to resume the continuation. This can be important for correctness and performance of such operations, so we should find a way to offer these libraries to participate in task priority handling.

Another example of libraries which may want to reach for manual task priority escalation APIs are libraries which facilitate communication across process boundaries, and would like to react to priority escalation and propagate it to a different process. Relying on the built-in priority escalation mechanisms won't work, because they are necessarily in-process, so libraries like this need to be able to participate and be notified when priority escalation happens, and also be able to efficiently cause the escalation inside the other process.

Proposed solution

In order to address the above use-cases, we propose to add a pair of APIs: to react to priority escalation happening within a block of code, and an API to cause a priority escalation without resorting to trickery using creating new tasks whose only purpose is to escalate the priority of some other task:

let m: Mutex<Task<Void, Never>?> 

await withTaskPriorityEscalationHandler {
  await withCheckedContinuation { cc in 
    let t = Task { cc.resume() }
    m.withLock { $0 = t }
  } onPriorityEscalated: { newPriority in 
    let t = m.withLock { $0 }
    Task.escalatePriority(t, to: newPriority)
  }
}

Detailed design

We propose the addition of a task priority escalation handler, similar to task cancellation handlers already present in the concurrency library:

public func withTaskPriorityEscalationHandler<T, E>(
  operation: () async throws(E) -> T,
  onPriorityEscalated handler: @Sendable (TaskPriority) -> Void,
  isolation: isolated (any Actor)? = #isolation
) async throws(E) -> T

The shape of this API is similar to the withTaskCancellationHandler API present since initial Swift Concurrency release, however–unlike a cancellation handler–the onPriorityEscalated callback may be triggered multiple times. The TaskPriority passed to the handler is the "new priority" the surrounding task was escalated to.

It is guaranteed that priority is ever only increasing, as Swift Concurrency does not allow for a task priority to ever be lowered after it has been escalated. If attempts are made to escalate the task priority from multiple other threads to the same priority, the handler will only trigger once. However if priority is escalated to a high and then even higher priority, the handler may be invoked twice.

Task escalation handlers are inherently racy, and may sometimes miss an escalation, for example if it happened immediately before the handler was installed, like this:

// priority: low
// priority: high!
await withTaskPriorityEscalationHandler {
  await work()
} onPriorityEscalated: { newPriority in // may not be triggered if ->high escalation happened before handler was installed
  // do something
}

This is inherent to the nature of priority escalation and even with this behavior, we believe handlers are a worthy addition. One could also check for the Task.currentPriority and match it against our expectations inside the operation wrapped by the withTaskPriorityEscalationHandler if that could be useful to then perform the operation at an already immediately heightened priority.

Escalation handlers work with any existing task kind (child, unstructured, unstructured detached), and trigger at every level of the hierarchy in an "outside in" order:

let t = Task {
  await withTaskPriorityEscalationHandler {
    await withTaskGroup { group in 
      group.addTask { 
        await withTaskPriorityEscalationHandler {
          try? await Task.sleep(for: .seconds(1))
        } onPriorityEscalated: { newPriority in print("inner: \(newPriority)") }
      }
    }
  } onPriorityEscalated: { newPriority in print("outer: \(newPriority)") }
}

// escalate t -> high
// "outer: high"
// "inner: high"

The API can also be freely composed with withTaskCancellationHandler or there may even be multiple task escalation handlers registered on the same task (but in different pieces of the code).

Manually propagating priority escalation

While generally developers should not rely on manual task escalation handling, this API also does introduce a manual way to escalate a task's priority. Primarily this should be used in combination with a task escalation handler to propagate an escalation to an unstructured task which otherwise would miss reacting to the escalation.

The escalatePriority API is offered as a static method on Task in order to slightly hide it away from using it accidentally by stumbling upon it if it were directly declared as a member method of a Task.

extension Task {
  public static func escalatePriority(_ task: Task, to newPriority: TaskPriority)
  public static func escalatePriority(_ task: UnsafeCurrentTask, to newPriority: TaskPriority)
}

It is possible to escalate both a Task and UnsafeCurrentTask, however great care must be taken to not attempt to escalate an unsafe task handle if the task has already been destroyed. The Task accepting API is always safe.

Currently it is not possible to escalate a specific child task (created by async let or a task group) because those do not return task handles. We are interested in exposing task handles to child tasks in the future, and this design could then be easily amended to gain API to support such child task handles as well.

Source compatibility

This proposal is purely additive, and does not cause any source compatibility issues.

ABI compatibility

This proposal is purely ABI additive.

Alternatives considered

New Continuation APIs

We did consider if offering a new kind of continuation might be easier to work with for developers. One shape this might take is:

struct State {
  var cc = CheckedContinuation<Void, any Error>?
  var task: Task<Void, any Error>?
}
let C: Mutex<State>

await withCheckedContinuation2 { cc in
  // ...
  C.withLock { $0.cc = cc }
    
  let t = Task { 
    C.withLock { 
      $0.cc?.resume() // maybe we'd need to add 'tryResume'
    }
  }
  C.withLock { $0.task = t }
} onCancel: { cc in
  // remember the cc can only be resumed once; we'd need to offer 'tryResume'
  cc.resume(throwing: CancellationError()) 
} onPriorityEscalated: { cc, newPriority in
  print("new priority: \(newPriority)")
  C.withLock { Task.escalatePriority($0.task, to: newPriority) }
}

While at first this looks promising, we did not really remove much of the complexity -- careful locking is still necessary, and passing the continuation into the closures only makes it more error prone than not since it has become easier to accidentally multi-resume a continuation. This also does not compose well, and would only be offered around continuations, even if not all use-cases must necessarily suspend on a continuation to benefit from the priority escalation handling.

Overall, this seems like a tightly knit API that changes current idioms of with...Handler without really saving us from the inherent complexity of these handlers being invoked concurrently, and limiting the usefulness of those handlers to just "around a continuation" which may not always be the case.

11 Likes

Very cool!

Is there a particular reason the escalation APIs weren't made non-static methods on [UnsafeCurrent]Task? i.e.,

extension Task {
  public func escalatePriority(to newPriority: TaskPriority)
}

extension UnsafeCurrentTask {
  public func escalatePriority(to newPriority: TaskPriority)
}
2 Likes

That's addressed here:

The escalatePriority API is offered as a static method on Task in order to slightly hide it away from using it accidentally by stumbling upon it if it were directly declared as a member method of a Task.

In other words, I'd rather people not "stumble into it by autocomplete" and then figure it's a great idea to actually call them. Most people really have no need to call these things and at least by putting them a little bit out of the way I hope to signal that in ways other than their documentation (which also indicates "you probably do not want to use this API")

1 Like

I really appreciate this; I spend a lot of time explaining to people that they probably want explicit priorities about 80% less than they're actually using them.

3 Likes

One question I guess is:

Today I proposed them both on Task, perhaps the

  public static func escalatePriority(_ task: UnsafeCurrentTask, to newPriority: TaskPriority)

should be on UnsafeCurrentTask though.

1 Like

I don’t think this is a necessary compromise. The escalation handler could be synchronously invoked as part of the registration process to ensure a client can always propagate priority correctly.

Nit: I believe the first argument should have the label of:.

Would this pitfall exist with the more straightforward UnsafeCurrentTask.escalatePriority() spelling?

Yes since UnsafeCurrentTask is unsafe; it's not about the spelling but about the fact that touching unsafe task is unsafe -- if you escape it and pass it around it may point to released task.

What happens if a task has already started execution when its priority is escalated? Does it yield and resume at a higher priority with each escalation?

1 Like

That's precisely the situation we call "escalation", otherwise it is just starting at some specified priority. Tasks get escalated if they are already created or even running, and concurrently something causes their priority to increase.

The observable effect of Task.currentPriority is immediate.

The scheduling effect may or may not be immediate, depending on platform executor; As for ordering guarantees, Swift does never suspend without a suspension point; and e.g. enqueue order to actors is priority based, but again, a difference can only be observed as a task is picked up by an actor to be executed. Swift concurrency makes no strict promises about the underlying platform's threading decisions. It will take effect when possible is the best approximation I'd say. On Linux our runtime doesn't really deal with priority much (except the actor enqueue buckets), but on Apple platforms it's all up to Dispatch and however it decides to act on priorities.

5 Likes

Eventually, it should be possible to implement a safe CurrentTask: ~Escapable. Does this conflict with the static function spelling, since it would require passing a non-escapable value as an argument?

The example is racy (and doesn't compile as it doesn't unwrap the optional). It needs to handle the case where the priority is escalated before the task is stored in the mutex e.g.:

enum State {
    case initialized
    case task(Task<Void, Never>)
    case priority(TaskPriority)
}
let state: Mutex<State> = .init(.initialized)

await withTaskPriorityEscalationHandler {
    await withCheckedContinuation { cc in
        let task = Task { cc.resume() }
        
        let newPriority: TaskPriority? = state.withLock { state -> TaskPriority? in
            defer { state = .task(task) }
            switch state {
            case .initialized:
                return nil
            case .task:
                preconditionFailure("unreachable")
            case .priority(let priority):
                return priority
            }
        }
        // priority was escalated just before we have store the task in the mutex
        if let newPriority {
            Task.escalatePriority(task, to: newPriority)
        }
    } onPriorityEscalated: { newPriority in
        let task: Task<Void, Never>? = state.withLock { state -> Task<Void, Never>? in
            switch state {
            case .initialized, .priority:
                // priority was escalated just before we have store the task in the mutex
                state = .priority(newPriority)
                return nil
            case .task(let task):
                return task
            }
        }
        if let task {
            Task.escalatePriority(task, to: newPriority)
        }
    }
}

Would this be handled (with less verbosity) by my suggestion from earlier?

@dnadoba To be honest I on purpose did not do the whole state machine dance to not deter from the gist of the proposal, but yeah, the "real" dance is something like you propose -- it's even linked in the proposal.

It is true I suppose that these proposals are the sole reference for how to use some of these APIs so we might as well showcase a more realistic pattern there, even if it looks more scary - those APIs are not trivial to use after all. I'll adjust the sample a bit.


It is a compromise, the alternatives are:

  1. trigger only if escalation happens during the with... {} block -- this is currently proposed
  • downside: potential to "miss triggering the handler" if escalated right before the handler is installed
    • :bulb: however: if that is the case though, this is also before the operation runs, and thus the Task.currentPriority inside the operation already is the escalated value. So, why would we need to run the handler in any way, if the operation already has access to the escalated value in a natural way -- the current priority, which can be propagated from there without any need for an "escalation" per se.
  1. trigger in case if a task was "before" we installed the handler.
  • :warning: The "before" isn't actually quite right way to think about it. It actually is if the task was "ever at all" escalated, because we only can compare current to base priority.
  • downside: potentially triggering un-necessarily, or "repeatedly" if one sets a handler many times in the same task, and had already used currentPriority to kick off work at appropriate priority

The second is what you're suggesting @ksluder, and it has the following surprising effect:

// Option 2 leads to: 

let t = Task { 
  await withTaskPriorityEscalationHandler {
    assert(Task.currentPriority == .default) // ok, we started at some level
    // let's assume escalation happens during the with block...
  } onPriorityEscalated { p in } // triggers -> high; ok

  assert(Task.currentPriority == .high) // already high...
  await withTaskPriorityEscalationHandler {
    // we're still escalated already...
    assert(Task.currentPriority == .high) 
    await // ... 
  } onPriorityEscalated { p in } // ❗️triggers *again* -> high
  // since .high > base (.default)
}

Task.escalatePriority(of: t, to: .high)

And there's no way to prevent those spurious handler calls, even as though un-necessary -- the operation already has the correct priority while entering the block.

So with all that said, I'm leaning towards the Option 1 since it is somewhat more logical, and we should not be causing potentially expensive escalation work in a handler when it is entirely un-necessary.

Adding additional storage to a task to "remember at what level it had already triggered handlers" isn't viable IMHO, space in a task is precious, and we cannot use task local allocated values for this either, so we'd incur a heap allocation which disqualifies the idea IMHO - we can't incur a heap allocation just to keep track of an escalation having happened.

Escalations always are inherently racy (this is a fact regardless of how we spell this API) because the escalation may totally happen after a block of code is done doing the "real work" and the escalation may not have any real impact on the quicker return of the operation. So I don't think it's necessarily worth the extra work this can potentially cause -- especially when thinking about cross process escalations which are a rather heavy operation.

1 Like

Since I can run arbitrary code inside an escalation handler, how can one ever deem an invocation of the handler “un-necessary?”

I’m imagining my handler might interact with some external-to-Swift priority mechanism (pthread priority, CPU core cluster affinity, GPU power state…) that can be reset as different tasks switch off and on. Which I now realize implies the need for a companion API on TaskExecutor to be invoked whenever the Task already running on that executor switches priority.

"Un-necessary" because the currentPriority that would be read inside the operation block will already be the escalated value, so you can start the work at the appropriate (higher) priority right away, rather than start on "too low" and then escalate it using the handler mechanism. (Note: Task.init uses Task.currentPriority as its default priority, in case this wasn't clear - I explained this a bit more in the proposal now as well).

Note also that if synchronously invoking the handler before the operation, you'd have to find a way to sneak the "escalation happened" into the operation -- which is then done using the Mutex<State> pattern @dnadoba shared earlier.

You need to do this pattern anyway because of this potential order of events:

  1. withTaskEscalationHandler -- install handler
  2. not escalated yet
  3. run operation
    a. make new Task
    b. [concurrently] escalation happens (!)
    c. [concurrently] onPriorityEscalated triggered -- task was not stored yet (need to store the escalated priority)
    d. attempt to store task; State contains escalated priority; it must have happened while operation was running
    e. store the task + escalate the task

If escalation happened before we installed the handler, we'll miss triggering the handler, but it doesn't really matter for priority purposes because then it also means it was before operation, and thus operation can observe Task.currentPriority at the escalated value and start the Task(priority: Task.currentPriority) rather than start the task at the "too low" priority only to immediately boost it (which is strictly worse than immediately scheduling at the expected higher priority).

// edit: @dnadoba I improved the example and added a thanks in acknowladgements :slight_smile:

2 Likes

I amended the

Task.escalatePriority(of: task, to: newPriority)

to include the of:. Generally we sometimes overuse those "of" or "on" but here it looks quite nice, thanks for the input.

I believe there is an issue with where onPriorityEscalated is placed in the following code:

Based on the detailed design, here is what I think is correct:

await withTaskPriorityEscalationHandler {
  await withCheckedContinuation { cc in 
    let t = Task { cc.resume() }
    m.withLock { $0 = t }
  }
} onPriorityEscalated: { newPriority in 
  let t = m.withLock { $0 }
  Task.escalatePriority(t, to: newPriority)
}
1 Like

for reference purposes, putting the two task state change handler APIs together here:

public func withTaskPriorityEscalationHandler<T, E>(
  operation: () async throws(E) -> T,
  onPriorityEscalated handler: @Sendable (TaskPriority) -> Void,
  isolation: isolated (any Actor)? = #isolation
) async throws(E) -> T
public func withTaskCancellationHandler<T>(
  operation: () async throws -> T,
  onCancel handler: @Sendable () -> Void,
  isolation: isolated (any Actor)? = #isolation
) async rethrows -> T

apologies if this seems a bit 'ramble-y' and disorganized, but i have a number of thoughts/questions based on the proposal that i'm curious to hear your input on:

motivation

the proposal alludes to 'trickery' that's currently done today as a means of indirectly inducing this escalation behavior. what exactly does that look like?

composition

given that the motivating example in the proposal nests both of the 'respond to Task state change' handlers, and i imagine that the emergent 'best practices' for this sort of thing will often end up doing the same, this raises a few questions about how well these two things compose.

a clear difference is the 'typed-throws-ness' of the two functions. given that they are not both typed throws, will order of nesting matter? if you nest these or try to compose them in a single wrapper function, are the errors expected to have to be type erased?

and on the subject of trying to combine them into a single function – in the alternatives section, an API that does combine both of these handlers into a single method is outlined. given that these two functions will often be combined, i expect some developers to write the same form of utility but with these tools as primitives (since that also would reduce the 'pyramid of doom' somewhat). do you think the stdlib may have a role in offering something like that? are there other 'task environment changed' callbacks one might envision wanting to expose in the future (task locals?)?

naming

two small things here:

  1. the cancel handler is named onCancel and not onCanceled, which makes me wonder if there is a more-aligned naming choice to be made. onPriorityEscalation or just onEscalation if the implicit context is sufficient? or maybe this new one has made the better choice and cancellation would ideally be changed?
  2. the proposed static methods seem like they should have named parameters for the tasks edit: i see this has now been changed in the PR.

misc

  • mainly out of curiosity, does Dispatch expose anything like this to explicitly increase QoS?
    • if not, why has such a tool not been necessary to expose as public API in that context?
  • does @isolated(any) have any role on the operation parameter?
  • are the execution semantics of the new method analogous to those of the cancellation handler? cancellation handling specifically has an 'Execution order and semantics' section in its docs.
  • if a task is canceled, will it still respond to priority escalation events?
4 Likes

I don't really want to put this hack into the proposal, it looks like this:

let escalateMe: Task<...>
Task(priority: .high) { await task.value }

The lack of, or slowness-in, adoption of typed throws by the existing APIs is not really subject of this proposal. We must and will fix it, but not as part of this proposal.

The end goal is for all concurrency APIs to properly adopt typed throws so new APIs should be adopting them right away -- as we do here.

Dispatch queues primarily deal with QoS and there's many APIs which take it. You can also use target queues and yes there's boosting and similar things happening, including when sync is used to wait on lower priority work etc. A lot of the supporting infrastructure is actually in the kernel though.

There's no "callback" for escalation because queue and thread priority can go up and down and it would not be reasonable to expect developers to be able to reasonably react to these more frequent events. Swift concurrency priority only ever goes up -- as discussed in the proposal, making it simpler to react to. And yes, this can cause "overhang" however tasks are shorter lived than threads or queues in general so we concluded it to be less of a concern.

If you have something in mind please share; so far I don't see the need for it.

The thread here just now was discussing the exact order semantics, so given that we seem to have converged on the semantics now we can add it to the proposal.

Sure, they're independent.

1 Like

You've demonstrated how this overload with Task could be used, but I am curious how one would use the UnsafeCurrentTask overload.

Do you by chance have any example to share? Or maybe some use cases where this could be helpful? Is this the overload you'd use in a structured context in contrast to escalating unstructured tasks?

1 Like