Support custom executors in Swift concurrency

Over the last few months, we've gotten a number of questions about how to customize execution in Swift concurrency. I'd like to propose the definition of custom executors, which I hope addresses many of these questions.

To quote the Proposed Solution section from the current draft:

An executor is an object to which opaque jobs can be submitted to be run later. Executors come in two basic kinds: serial executors run at most one job at a time, while concurrent executors can run any number of jobs at once. This proposal is concerned with both kinds, but their treatment is quite different.

Swift's concurrency design includes both a default concurrent executor, which is global to the process, and a default serial executor implementation, which is used for actor instances. We propose to allow this to be customized in a number of ways:

  • A custom executor can be defined by defining a type that conforms to the Executor protocol.
  • The concurrency library will provide functions to explicitly run an asynchronous function on a specific executor.
  • Actors can override how code executes on them by providing a reference to a specific serial executor.
  • The default concurrent executor can be replaced by defining certain symbols within the program.

Please read the full proposal and let me know what you think.

38 Likes

I would also point out that *Ref is a C/C++ naming convention, not Swift's. Even if these types were references, they'd simply be UnownedExecutor and UnownedJob.

3 Likes

Priority is discussed elsewhere in the concurrency proposals, I think in Structured Concurrency.

The Ref types are themselves value types, but they hold references to actors and executors, both of which must be reference types. It’s because of that (conceptual, not physical) indirection that I thought the Ref suffix would be clarifying.

7 Likes

Absolutely delighted to see that this level of customization is being exposed to users at this early stage. Two questions on an initial read of the document:

Regarding SerialExecutor, there is this curious asymmetry between giving up the thread and attempting to claim the thread:

protocol SerialExecutor: Executor {
  /// Is it possible for this executor to give up the current thread
  /// and allow it to start running a different actor?
  var canGiveUpThread: Bool { get }

  /// Given that canGiveUpThread() previously returned true, give up
  /// the current thread.
  func giveUpThread()

  /// Attempt to start running a task on the current actor.  Returns
  /// true if this succeeds.
  func tryClaimThread() -> Bool
}

How often would one check canGiveUpThread that would not then involve moving forward to invoke giveUpThread()? I assume some consideration drove the pitched design here as opposed to having a tryGiveUpThread() -> Bool but I'm curious as to what that is.

With the assumption that the above was a considered divergence, then a corresponding question about tryClaimThread(): Is there some reason that the same considerations applied to separating canGiveUpThread and giveUpThread do not apply to claiming the thread?


An actor may derive its executor implementation in one of the following ways. We may add more ways in the future.

  • The actor may declare a property named serialExecutor . The property must not be actor-isolated.
  • [...]

Noting that Actor is a protocol, can an actor be said to "declare a property named serialExecutor" for the purposes of this first bullet point by virtue of explicitly conforming to a protocol MyCustomActor: Actor which provides a default implementation of serialExecutor?

(A similar question regarding the second bullet point: Can an actor be said to "declare a property named delegateActor" for the purposes of this second bullet point by virtue of explicitly conforming to a protocol MyCustomActor2 : Actor which provides a default implementation of delegateActor?)

2 Likes

You want to ask the current executor whether its thread can be reused before you call tryClaimThread() because if you needlessly claim an executor for a thread that's not supposed to actually execute it, you're creating a lot of unnecessary contention on the target executor. If the only way you can ask the current executor whether its thread can be reused is to ask it to give up its thread, it might give up its thread despite you not actually being able to take over the target executor.

I can't say that the asymmetry is logically required, but there are pragmatic reasons that support it. Claiming a thread is unlikely to unconditionally succeed because typical executors can already be running concurrently, or they can have earlier jobs (if FIFO-ordered) or higher-priority jobs (if priority-ordered) to run first. It's temporally conditional because those conditions can arise at any time. On the flip side, giving up a thread is typically temporally unconditional: it's either impossible (because the thread is reserved for the executor alone) or always allowed, but it's not dependent on what work is enqueued on the executor. I can imagine cases where these things aren't true — an executor that's pinned to a thread which it's willing to share with other work, or an executor that adaptively gets possessive about its thread when there's a lot of work enqueued there — but they're a lot less likely.

In practice, we should be able to skip the canGiveUpThread() call for executors that unconditionally support giving up their threads, like the default serial executor.

That's a very good question. I would say that implementations from protocol extensions ought to count in both cases, yes, but I should check with @Douglas_Gregor to see if he sees any circularity problems from allowing that.

3 Likes

Can you explain the switching concept at a very basic level? I don’t understand it.

1 Like

An actor is conceptually like a lock: it protects some data from data races by only allowing one bit of code to access it at a time. That basic abstraction can be implemented in a number of ways.

One traditional way to implement it is to have the actor be a sort of job queue; when you want the actor to do something, you just add a job to the actor's queue, and you make sure there's a thread that's going to process the queue. The actor might have a thread dedicated to it, always waiting around for something to be added to its queue, in which case you just have to wake that thread up; or it might not, meaning you need to ask the system for a thread to come run jobs for the actor. In either case, some thread runs jobs from the actor queue one after another, usually until it's drained the queue completely, and then goes back to either wait for more jobs or do something else.

That traditional implementation is heavily biased towards preserving locality for the data and code of the actor and biased away from preserving locality for the data and code of the tasks that want to use the actor. That's great if you have an actor that's heavily contended, because there'll be a single thread that processing all the actor's jobs, and the CPU will naturally keep the actor's data and functions in cache on whatever core is running that thread, allowing all the actor operations to complete very quickly. But it's pretty bad if you have an actor that's lightly contended, because it's quite likely that you'll do a thread switch every time you either start or finish a request on the actor, and the locality for all the data and code associated with your task can be very poor.

The idea of "switching" is that the thread follows a single task as much of possible. An actor still has a job queue. If a task makes a request of an actor, then instead of immediately adding an asynchronous job to that queue, the task asks to start executing the actor on the current thread. Now, that can fail, most likely because there's already a thread running the actor; if it does, then the task just adds itself as a job to the actor's queue, like it would in the traditional queue implementation. But if it succeeds, then the current thread can immediately go and run code on behalf of the actor, for basically no more cost than a single atomic operation. And on the other side, when the task is done running code on the actor, it just asks the actor to record that there's no longer a thread processing it, and then the thread can immediately continue running whatever code comes next for the task. If the actor has more jobs to do, it just asks the system for a thread to continue processing it. (As a result, when there is significant contention for the actor, actor processing can end up bouncing between threads, which isn't great. I expect that we'll want to do some work to recognize this kind of actor contention dynamically and have the thread stick with the actor for awhile. But low contention is very common.)

Another way of looking at this is that the actor has an "asynchronous lock". A traditional synchronous mutex has a queue of waiting threads built into it, and when a thread fail to acquire the lock, it blocks on that queue until the lock is available. This is basically exactly that, except that it's a queue of jobs instead of threads, and so waiting never blocks a thread, it just blocks a task by adding a job to resume it to the queue.

36 Likes

This may be a stupid question, but why is UnownedJobRef a value type?

It would be possible to create multiple copies of a job reference by mere assignment, which to me seems undesirable (why would you want to allow that?). Even more so when considering that execution invalidates a job reference -- would that also invalidate copies of a job ref? Or am I getting this entirely wrong?

Thank you for putting this together. This will be a major game-changer for systems that do non-blocking, evented I/O directly with the OS interfaces (such as kqueue/epoll/io_uring). Without custom executors, the I/O eventing threads (which may block in kevent/epoll_wait/io_uring_enter) cannot be the "default executor" threads (because of course you don't want to block them). Therefore without custom executors such systems would have necessarily incurred a pervasive amount of thread switching to get from the I/O threads onto the executors for async/await and back.

As a real-world example, SwiftNIO does its own evented I/O handling (mostly using kqueue/epoll at the moment) so needless to say I'm super happy and very supportive about this proposal because it will mean a major speed improvement when using async/await with SwiftNIO :slight_smile:. [later addition through EDIT]: I wrote this up in a little more detail.


@John_McCall I've got one question regarding the immediate execution of asynchronous work in an executor: Let's imagine we're in a (custom) executor and we want to call user code in the form of a let userCode: () async -> Void. One way we can of course make this work is by using

// self is a serial, custom executor
self.run(userCode)

which would then soon call the executor's func enqueue(_ job: UnownedJobRef) with the userCode as an UnownedJobRef. The executor would then probably enqueue that in its own task queue and soon after run

userCodeAsUnownedRefFromTaskQueue.execute(self.asUnownedRef())

This works but will always go through the task queue. But what if the custom executor knows that it can run a task straight away (without going through the queue), how could we (without using enqueue go from userCode: () async -> Void to an UnownedJobRef that we can then execute immediately?

5 Likes

Nit (Motivation section):

and instead needs only on high-level semantic properties

“needs only high-level…” or “relies only on high-level…”, I assume

UnownedJobRef:

The executor reference must remain valid during this call unless it is a serial executor which the job successfully gives up.

This is unclear to me. Does it mean that if execute(currentExecutor: someRef) in turn causes some switching or suspending activity, someRef can then become invalid before this invocation of execute returns?

Also, would it not make sense to have a version of Task.runDetached that takes an executor? Or should we assume that Task.runDetached { someExecutor.run { … }} will avoid bouncing off the default concurrent scheduler?

Executor.run doesn’t translate directly to a call to enqueue; it does an executor switch in the current task, and executor switches should always check whether you’re already on the target executor. So if you’re sure you’re on an executor already, calling run on that same executor doesn’t do anything beyond just calling the operation directly.

4 Likes

Yes, if a serial executor participates in switching, then the serial executor (if any) that’s running at the end of a job might be totally different from the original serial executor that was running at the start, and the latter may even have been deallocated.

That’s a great point, and I’d meant to have something about this in the pitch. I think we should be able yo do that analysis reliably, but allowing tasks to override the initial executor explicitly may still be the right way to go.

Thanks for explaining the switching in more detail @John_McCall. I'm still pretty confused about who is responsible for what. I'm going to write a bit and ask some more questions. Please stop me if my assumptions are incorrect.

It seems to me that the Executor is responsible for keeping track of the jobs enqueued to it. I assume the system also knows which jobs are enqueued on which executor.

For a switching Executor am I understanding correctly that the executor will essentially own a thread and that thread will be given to the executor by the system calling executor.tryClaimThread()? If the executor is able to claim the thread it returns true and then is it correct to say that the system will reserve that thread only for this executor until the system is able to take the thread back by a successful call to canGiveUpThread and giveUpThread()?

That's correct, yes, at least until the job is executed.

I assume the system also knows which jobs are enqueued on which executor.

Not beyond what the executors do.

For a switching Executor am I understanding correctly that the executor will essentially own a thread and that thread will be given to the executor by the system calling executor.tryClaimThread() ?

I wouldn't describe it as ownership exactly, but yes, if a serial executor starts running in a thread, it's that executor's prerogative when to give it up.

It's possible that the exact protocol used here will need some revision.

I have a couple of terminological questions, in relation to async/await and structured concurrency.

In that context, what is a "job"? Is it a partial task? Is it a portion of a task between potential suspension points, or between actual suspension points? Or is it an entire task?

Also, since the term "partial task" is not used in the async/await proposal, what is the correct the equivalent term there? (Specifically, for fragments of synchronous code between await-ed statements? Also "partial task"? Or is the concept of a task explicitly left out of async/await (non-concurrent)?)

I don't see any real problem here, but it would be good to make these definitions explicit in the proposals, I think.

I think there's some confusion here. On a semantic level, an UnownedJobRef is a reference to an existing, opaque job. There is no way to make it a "value type" in any semantically meaningful way, where copying it would give you an independent reference to a different job, because there's only one underlying job, and it's opaque and non-copyable, and you can't safely run it twice.

Given that, UnownedJobRef is a struct because making it a class would require a new class instance to get allocated every time a job was needed, which would be needless overhead.

If Swift supported move-only types, UnownedJobRef could be a safe move-only type instead of an unsafe reference. But Swift doesn't support move-only types yet, and we can express this immediately instead of blocking it on move-only types by using an unsafe reference, and so we're using an unsafe reference.

6 Likes

I'm using job instead of partial task because I think it's a clearer and more universal piece of terminology. I guess if you wanted to use both terms and draw a distinction, you would say that a partial task is a kind of job — probably the most common kind of job, and the only kind of job currently created by the system as just part of running async functions — but that we didn't want to constrain jobs to only be partial tasks.

The job to resume a task will run the task until it actually suspends. That means not only a true suspension point but a suspension point where the task's execution is in fact dynamically suspended. So the categories are:

  • A potential suspension point is a syntactic position in a function where suspension is possible, maybe depending on what some other code does; it must be marked with await.
  • A suspension point is a place in the formal computation history of a task where the semantics of the language permit the task to be suspended.
  • A dynamic suspension point is a place in the formal computation history of a task where the implementation did actually suspend the task, allowing other work to occur on the current thread and (potentially) causing the task to eventually resume on a different thread.
2 Likes

Does that mean a switching Executor will be given a thread before the first job is enqueued on it?

Are all the following expected to be called from any specific thread or can the be called from any thread?

  • func enqueue(_ job: UnownedJobRef)
  • var canGiveUpThread: Bool { get }
  • func giveUpThread()

I assume func tryClaimThread() -> Bool is called from the thread the system wants the Executor to start using.

Are there guarantees about the order of the above calls? For example will the system ensure that it will never call canGiveUpThread, then enqueue, then giveUpThread?

Is giveUpThread essentially a promise from the Executor to behave and not make calls on it again?

  /// Is it possible for this executor to give up the current thread
  /// and allow it to start running a different actor?
  var canGiveUpThread: Bool { get }

Does "allow it to start running a different actor" mean let another executor use it?

  /// Attempt to start running a task on the current actor.  Returns
  /// true if this succeeds.
  func tryClaimThread() -> Bool

What does "on the current actor" mean here? Is that a typo? Should it say "thread"?

In situations where switching is impossible, such as when the actor is already executing on a thread, a job to process the actor will be scheduled onto the default concurrent executor.

Can you explain this? Is this talking about enqueuing the current partial task or is this enqueuing a job that will enqueue the partial task?


Do I understand the following correctly? Switching is used when a task is running across actors boundaries. Take for example a call from inside actor A to actor B. In that case there will be a partial task on actor A which makes a call to a partial task on actor B. In order to not do a thread hop from actor A's executor's thread to actor B's executor's thread, after the partial task on actor A is complete, the system will ask executor A if it can canGiveUpThread and then ask it to giveUpThread. If executor A does give it up, the system will ask executor B to tryClaimThread. Thus the partial task in actor B will run on the same thread. Then I guess the same thing would happen in reverse when the actor B call completes and passes data back.

Will the system only ever ask an Executor to give up its thread when it knows the Executor has no enqueued tasks?

What happens if an Executor has no thread? Is that executor error to not claim an offered thread?

What happens if a bunch of Executors refuse to give up their threads? Is that just bad executor design? Will the system create more threads?

Sorry for all the questions. Thanks!

1 Like

We can't have completely different priority types for different executors. Tasks have priorities and can require work to be done across many different executors. And ultimately the priority system has to be reflected into at least the platform's thread-priority system.

I don't think there's a whole lot up in the air about the priority system except that people are unhappy about the names not all being equally applicable in all domains.

2 Likes

No, look, this is why I think ownership is the wrong way of thinking about it. You're trying to imagine every executor as being its own little thread pool, possibly of at most one thread, that threads shift in and out of it. I don't think that's an illuminating mental model, but I'm not sure how to break you out of it.

All of the protocol members can be called from any thread.