tl;dr - I built a mechanism for interleaving nonisolated(nonsending) work at suspension points. It works! But have I built a foot-gun?
I've been leaning heavily into non-sendable types and nonisolated(nonsending) async methods to build up subsystems. It's been working really well—I love how you can spell out isolated islands of collaborators without picking the isolation ahead of time. Callers create the instances and choose the isolation.
But I've hit situations where I'd love to interleave work that isn't explicitly isolated. The work must never conflict, but it may await things that suspend...and if one worker is suspended, the others in the same non-explicit isolation could make forward progress. A best-effort "everyone make forward progress together" mechanism.
So I came up with this idea:
final class SharedNonSendableThing {
var output: String = ""
nonisolated(nonsending) func append(_ counter: Int) async {
await Task.yield() // Simulate suspending work.
output.append("\(counter)\n")
}
}
nonisolated(nonsending) func myTurn() async throws {
let thing = SharedNonSendableThing()
await InterleavedWorkQueue.run { group in
group.run {
for i in 0..<100 {
await thing.append(i)
}
}
group.run {
for i in 100..<200 {
await thing.append(i)
}
}
group.run {
for i in 200..<300 {
await thing.append(i)
}
}
}
print(thing.output)
}
Notice how myTurn() is nonisolated(nonsending). I wouldn't be able to use async let or TaskGroup here because thing can't be safely captured and mutated across different isolation domains.
To achieve the same idea as TaskGroup, InterleavedWorkQueue takes nonisolated(nonsending) closures and promises to do the right thing. The compiler is happy and lets things through!
If I comment out the await Task.yield() line, output contains lines from 0..<300 in order. With the await, it's interleaved to varying degrees.
This isn't trying to require concurrency. It's a way to schedule work that could make forward progress if one of the other worker closures suspends.
The way I got this to work is by using an Actor instance internal to InterleavedWorkQueue. This private instance provides a serial isolation guarantee—you can't start work concurrently. It leverages actor reentrancy to take advantage of interleaving safely. It uses task groups internally and unsafe annotations to convince the compiler it'll all work out.
So my question is whether I'm thinking about this correctly. The private actor guarantees everything is started without overlapping access. Any suspension allows forward progress on another non-isolated closure. Just like withTaskGroup, the outer closure suspends while the interleaved work is running.
Based on my reading, this interface tells the compiler that non-isolated closures can interleave safely. And the internals ensure that.
Am I reading this wrong?
Here's the implementation of InterleavedWorkQueue:
public enum InterleavedWorkQueue {
public typealias Runner = nonisolated(nonsending) (inout Group) async -> Void
public typealias Work = nonisolated(nonsending) () async -> Void
public struct Group: ~Copyable {
fileprivate let isolation: Isolation
fileprivate var taskGroup: TaskGroup<Void>
public mutating func run(_ work: @escaping Work) {
let isolation = self.isolation
let w = UnsafeWork(work: work)
taskGroup.addTask {
await isolation.run(w)
}
}
}
public static nonisolated(nonsending) func run(_ runner: Runner) async {
// If someone gets sneaky and creates a new `InterleavedWorkQueue` within an
// existing closure passed to a different work queue, we want to make sure they all
// share the same isolation domain.
//
// A task local holding a common isolation is the best way to pull this off.
let isolation = Self.isolation ?? Isolation()
await Self.$isolation.withValue(isolation) {
await withTaskGroup(of: Void.self) { taskGroup in
var g = Group(isolation: isolation, taskGroup: taskGroup)
await runner(&g)
await taskGroup.waitForAll()
}
}
}
/// The task local ensures that anyone within a worker closure's call tree who tries
/// to create a new work queue shares the same isolation. We don't want to allow an
/// escape hatch that constructs conflicting access to closure captures by making
/// multiple nested queues.
@TaskLocal
fileprivate static var isolation: Isolation?
/// Purposely `@unchecked Sendable` because we want to carry the non-isolated and
/// non-sending `Work` closure into the isolating actor. We're promising to _only_
/// invoke this on that actor so we maintain the exclusivity contract.
fileprivate struct UnsafeWork: @unchecked Sendable {
var work: Work
}
/// A private actor that provides the isolation guarantee. We unsafely carry all the
/// provided work closures into this actor's isolation domain.
fileprivate actor Isolation {
func run(_ work: UnsafeWork) async {
await work.work()
}
}
}