Concurrency: suspending an actor async func until the actor meets certain conditions

I have a database actor which vends transaction actors. These can run serially or concurrently. Through a transaction the caller can update the database async-wise.

Updating the database schema should only be done when there are no active transactions. When a schema migration is started I should therefore wait for all current transactions to complete (and refuse starting new ones).

A first approach can be seen in the snippet below.
Using Taks.sleep works but seems a pre-actor way of doing things.
With Task.yield the loop runs basically continously. (Should have seen that coming, I guess, because a transaction just sits around waiting for work or a final commit call.)

I found this thread but that is about emitting values in a producer/consumer pattern. In my case I have to wait and then run once before returning to the caller. (This will probably evolve into a Migration actor which has exclusive access so that multiple migration steps can run in series but asynchronously.)

Other approaches:
I considered storing the migration for later use. Each time a transaction is completed I would check the number of active transactions. If zero I run the stored migration. But then migrate func would return immediately. The caller might assume the new schema is in place and do followup "calls".

I also thought about using TaskGroup. Each time a transaction starts, add something to a group. When the migration starts, it would await the group.

Any patterns I could/should utilise here?
Thanks in advance for your advice/suggestions.

actor MyActor
{
	enum State
	{
		case busy
		case ready
	}

	var active = 0
	var state = State.ready
	func migrate() async throws
	{
		guard state == .ready else { return }
		state = .busy
		print("waiting \(active)")
		while active > 0
		{
//			await Task.yield() //runs way too often
			try await Task.sleep(nanoseconds: 1_000_000_000 * 1) //ugly
			print("check \(active)")
		}
		print("migrating \(active)")
		state = .ready
	}

	func test(_ msg: String, sleep: UInt64 = 2) async throws
	{
		guard state == .ready else { return }
		active += 1
		print("sleep \(msg)")
		try await Task.sleep(nanoseconds: 1_000_000_000 * sleep)
		print("awake \(msg)")
		active -= 1
	}
}

and to run it:

    static func main() async
	{
		do
		{
			let actor = MyActor()
			let taskA = Task { try await actor.test("foo", sleep: 3) }
			let taskB = Task { try await actor.test("bar") }
			try await Task.sleep(nanoseconds: 1_000_000_0 * 5) //otherwise migrate run first
			let taskM = Task { try await actor.migrate()   }   //should wait for A and B
			let taskC = Task { try await actor.test("food") }  //should not run
			let taskD = Task { try await actor.test("bars") }  //should not run
			print("done")
			try await Task.sleep(nanoseconds: 1_000_000_000 * 5)
		}
		catch { print(error) }
	}

There's no existing pattern for this kind of "wait", other than the pseudo-wait via Task.sleep. I think the forthcoming Task.select might be helpful for this kind of thing, in the minimal case of suspending on a single outstanding task.

The only current solutions AFAIK are: use one of the with…Continuation functions at the deepest level of waiting, essentially bridging from the old completion-handler pattern back to Swift concurrency; or, use a custom AsyncStream to deliver completions of events you're waiting on.

You do face a second problem, though. Using await inside an actor is very bad news because actors are re-entrant. At the suspension point of such an await, other actor-isolated functions and accessors can run, so the awaiting function is not completely isolated as you might expect. This is fine if your actor has no breakable invariants that can be compromised across an internal suspension point, but that's very hard to ensure — and it pretty much takes away the isolation advantages that actors were intended to provide.

Proceed with extreme caution. :slight_smile:

3 Likes