Non-isolated, Non-sending Interleaved Work Queue

tl;dr - I built a mechanism for interleaving nonisolated(nonsending) work at suspension points. It works! But have I built a foot-gun?

I've been leaning heavily into non-sendable types and nonisolated(nonsending) async methods to build up subsystems. It's been working really well—I love how you can spell out isolated islands of collaborators without picking the isolation ahead of time. Callers create the instances and choose the isolation.

But I've hit situations where I'd love to interleave work that isn't explicitly isolated. The work must never conflict, but it may await things that suspend...and if one worker is suspended, the others in the same non-explicit isolation could make forward progress. A best-effort "everyone make forward progress together" mechanism.

So I came up with this idea:

final class SharedNonSendableThing {
    var output: String = ""
    
    nonisolated(nonsending) func append(_ counter: Int) async {
        await Task.yield() // Simulate suspending work.
        output.append("\(counter)\n")
    }
}

nonisolated(nonsending) func myTurn() async throws {
    let thing = SharedNonSendableThing()

    await InterleavedWorkQueue.run { group in
        group.run {
            for i in 0..<100 {
                await thing.append(i)
            }
        }
        group.run {
            for i in 100..<200 {
                await thing.append(i)
            }
        }
        group.run {
            for i in 200..<300 {
                await thing.append(i)
            }
        }
    }
    
    print(thing.output)
}

Notice how myTurn() is nonisolated(nonsending). I wouldn't be able to use async let or TaskGroup here because thing can't be safely captured and mutated across different isolation domains.

To achieve the same idea as TaskGroup, InterleavedWorkQueue takes nonisolated(nonsending) closures and promises to do the right thing. The compiler is happy and lets things through!

If I comment out the await Task.yield() line, output contains lines from 0..<300 in order. With the await, it's interleaved to varying degrees.

This isn't trying to require concurrency. It's a way to schedule work that could make forward progress if one of the other worker closures suspends.

The way I got this to work is by using an Actor instance internal to InterleavedWorkQueue. This private instance provides a serial isolation guarantee—you can't start work concurrently. It leverages actor reentrancy to take advantage of interleaving safely. It uses task groups internally and unsafe annotations to convince the compiler it'll all work out.

So my question is whether I'm thinking about this correctly. The private actor guarantees everything is started without overlapping access. Any suspension allows forward progress on another non-isolated closure. Just like withTaskGroup, the outer closure suspends while the interleaved work is running.

Based on my reading, this interface tells the compiler that non-isolated closures can interleave safely. And the internals ensure that.

Am I reading this wrong?

Here's the implementation of InterleavedWorkQueue:
public enum InterleavedWorkQueue {
    public typealias Runner = nonisolated(nonsending) (inout Group) async -> Void
    public typealias Work = nonisolated(nonsending) () async -> Void
    
    public struct Group: ~Copyable {
        fileprivate let isolation: Isolation
        fileprivate var taskGroup: TaskGroup<Void>
        
        public mutating func run(_ work: @escaping Work) {
            let isolation = self.isolation
            let w = UnsafeWork(work: work)
            taskGroup.addTask {
                await isolation.run(w)
            }
        }
    }
    
    public static nonisolated(nonsending) func run(_ runner: Runner) async {
        // If someone gets sneaky and creates a new `InterleavedWorkQueue` within an
        // existing closure passed to a different work queue, we want to make sure they all
        // share the same isolation domain.
        //
        // A task local holding a common isolation is the best way to pull this off.
        let isolation = Self.isolation ?? Isolation()
        await Self.$isolation.withValue(isolation) {
            await withTaskGroup(of: Void.self) { taskGroup in
                var g = Group(isolation: isolation, taskGroup: taskGroup)
                await runner(&g)
                await taskGroup.waitForAll()
            }
        }
    }
    
    /// The task local ensures that anyone within a worker closure's call tree who tries
    /// to create a new work queue shares the same isolation. We don't want to allow an
    /// escape hatch that constructs conflicting access to closure captures by making
    /// multiple nested queues.
    @TaskLocal
    fileprivate static var isolation: Isolation?

    /// Purposely `@unchecked Sendable` because we want to carry the non-isolated and
    /// non-sending `Work` closure into the isolating actor. We're promising to _only_
    /// invoke this on that actor so we maintain the exclusivity contract.
    fileprivate struct UnsafeWork: @unchecked Sendable {
        var work: Work
    }

    /// A private actor that provides the isolation guarantee. We unsafely carry all the
    /// provided work closures into this actor's isolation domain.
    fileprivate actor Isolation {
        func run(_ work: UnsafeWork) async {
            await work.work()
        }
    }
}
2 Likes

I'm really glad you asked this question. I've encountered something that feels very similar.

Basically, I had a collection of non-sendables. I wanted to kick off some work for each of them, which could happen in parallel. Doing it sequentially was easy. But allowing all of that work to actually happen in parallel was tricky.

I just wrote this off the top of my head, but I think it is pretty close to what I originally did.

@concurrent func networkOp() async {
}

class NonSendable {
	nonisolated(nonsending) func asyncWork() async {
		// stuff
		await networkOp()
		// things
	}
}

func doWork(with items: [NonSendable], isolation: isolated (any Actor)) async {
	// not ideal
	for item in items {
		await item.asyncWork()
	}

	// conceptually closer, but unsafe (and does not compile
	await withTaskGroup { group in
		for item in items {
			// error: Passing closure as a 'sending' parameter
			group.addTask {
				await item.asyncWork()
			}
		}
	}

	// awkward and annoying, but I think it works?
	let tasks = items.map { item in
		Task {
			_ = isolation

			await item.asyncWork()
		}
	}

	for task in tasks {
		await task.value
	}
}

I know this is not identical to what you are talking about, but it does feel close. If I am understanding right, I think your desire to do this makes a lot of sense and if your queue implementation is correct, then you should be fine.

98% of the time, I use nonisolated(nonsending) to make it easier to work with non-sendable types, like you are doing here. But there is a small percentage of the time when what actually matters is I can move from caller to callee with a guarantee of no suspension. If that was the case, I'd have to think harder.

Another question is if sprinkling around Task.yield() would be sufficient. I'm not sure if it would or not, but it would certainly be simpler and would not involve any participation from the caller. Those are considerable advantages. Have you experimented with that? I did not, but now I'm starting to think if perhaps I should have...

3 Likes

Can you explain why this is necessary? Why can’t it be @concurrent?

Yeah I think we're on the same page with our ideas. Except in your case you know the isolation you're passing to the doWork distributor. In the subsystems I'm playing around with there is no isolation declared at all.

Another question is if sprinkling around Task.yield() would be sufficient.

I'm not quite sure where you were going here, but I thought it'd be good to point out that I'm not necessarily trying to force interleaving. If nothing suspends it's totally fine in my case if things were serial.

But if anything my non-sendable pieces calls happens to suspend, say for minutes at a time waiting on work, then it might be good to allow other pieces to continue. So this is more about picking a structure where it could happen if need be.

1 Like

Because then everything it captures would have to be Sendable.

1 Like

Yes, and that's a huge disadvantage. I don't want to know!

I'm forced to expose a non-optional isolated param because I need to have the Task match the caller's isolation. This is a common pitfall/tradeoff/limitation of creating new tasks inside of non-sendable types.

I'm always torn on this. Because on the one hand I don't like how difficult it is. But on the other hand, this is a fairly specialized behavior and I don't know how to express it any other way. I'll admit, though, that I never considered a construct like you are using here and it's pretty interesting.

Another safe solution:

actor MyActor {
    let thing: SharedNonSendableThing

    init(_ thing: sending SharedNonSendableThing) {
        self.thing = thing
    }

    func run(fns: nonisolated(nonsending) @Sendable (SharedNonSendableThing) async -> Void ...) async {
        await withTaskGroup { group in
            for fn in fns {
                group.addTask {
                    await self._run(fn)
                }
            }
        }
    }

    func _run(_ fn: nonisolated(nonsending) (SharedNonSendableThing) async -> Void) async {
        await fn(thing)
    }
}

Introducing MyActor._run wrapper helps to make sure:

  • It's safe to pass non-Sendable value to nonisolated(nonsending) closures.
  • It becomes irrelevant in which isolation addTask`'s closure runs (I doubt that it's possible to get it working otherwise).

You run all tasks in a private actor in your code, so you know the isolation and can pass it to isolation parameter.

Example: @mattie's approach, implemented in a standalone function
actor MyActor {}

func run(
    isolation: isolated (any Actor)? = MyActor(),
    _ thing: sending SharedNonSendableThing, 
    fns: nonisolated(nonsending) @Sendable (SharedNonSendableThing) async -> Void ...
) async {
    let tasks = fns.map { fn in
        Task {
            _ = isolation
            await fn(thing)
        }
    }
    for t in tasks {
        await t.value
    }
}
Example: similar approach, but using implicit isolation parameter `self`
actor MyActor {
    let thing: SharedNonSendableThing

    init(_ thing: sending SharedNonSendableThing) {
        self.thing = thing
    }

    func run(
        fns: nonisolated(nonsending) @Sendable (SharedNonSendableThing) async -> Void ...
    ) async {
        let tasks = fns.map { fn in
            Task {
                await fn(thing)
            }
        }
        for t in tasks {
            await t.value
        }
    }
}

Note: thing must be saved in a stored property first. It doesn't compile if passing thing as a parameter to MyActor.run directly. I believe the difference is caused by how isolation inference works.

Could you elaborate it a bit? I can't see what issue it would cause if isolation parameter is Optional.

My experiments show that capturing isolation parameter approach doesn't work in this case. I don't understand why it works for unstructured tasks but not for structured tasks. Does compiler handle Task.init's closure specially?

    await withTaskGroup(isolation: isolation) { group in
        for item in items {
			_ = isolation
            group.addTask {
				_ = isolation
                await item.asyncWork()
            }
        }
    }

I think your implementation is too complex because you're trying to reimplement how multiple tasks run in actor. The only disadvantage of the above safe approaches is that their API is probably not as good as yours. But if using unsafe escape hatch I suspect there might be simpler approaches.

1 Like

Non-optional isolated parameters, non-sendables, and Task inheritance together can be unsafe due to a long-standing issue. I just default to never accepting an optional param when using them in this configuration, but it is conceivable to me that not all arrangements are unsafe.

Yes! But that's because Task.init actually is defined differently. It makes use of @_inheritActorContext and that it why the isolation variable capture trick works. The isolation of closures marked with that attribute depend on its captures. addTask does not use that attribute, so the only way to influence its isolation is with a global actor annotation. And I think that could also work in this case, though I'm not sure how desirable that would be.

1 Like

The problem with this example is the @Sendable annotation. I'm trying to explicitly run asynchronous closures and not mark them as sendable. The goal is to have exclusive access to captures.