Concurrency: suspending an actor async func until the actor meets certain conditions

I have a database actor which vends transaction actors. These can run serially or concurrently. Through a transaction the caller can update the database async-wise.

Updating the database schema should only be done when there are no active transactions. When a schema migration is started I should therefore wait for all current transactions to complete (and refuse starting new ones).

A first approach can be seen in the snippet below.
Using Taks.sleep works but seems a pre-actor way of doing things.
With Task.yield the loop runs basically continously. (Should have seen that coming, I guess, because a transaction just sits around waiting for work or a final commit call.)

I found this thread but that is about emitting values in a producer/consumer pattern. In my case I have to wait and then run once before returning to the caller. (This will probably evolve into a Migration actor which has exclusive access so that multiple migration steps can run in series but asynchronously.)

Other approaches:
I considered storing the migration for later use. Each time a transaction is completed I would check the number of active transactions. If zero I run the stored migration. But then migrate func would return immediately. The caller might assume the new schema is in place and do followup "calls".

I also thought about using TaskGroup. Each time a transaction starts, add something to a group. When the migration starts, it would await the group.

Any patterns I could/should utilise here?
Thanks in advance for your advice/suggestions.

actor MyActor
{
	enum State
	{
		case busy
		case ready
	}

	var active = 0
	var state = State.ready
	func migrate() async throws
	{
		guard state == .ready else { return }
		state = .busy
		print("waiting \(active)")
		while active > 0
		{
//			await Task.yield() //runs way too often
			try await Task.sleep(nanoseconds: 1_000_000_000 * 1) //ugly
			print("check \(active)")
		}
		print("migrating \(active)")
		state = .ready
	}

	func test(_ msg: String, sleep: UInt64 = 2) async throws
	{
		guard state == .ready else { return }
		active += 1
		print("sleep \(msg)")
		try await Task.sleep(nanoseconds: 1_000_000_000 * sleep)
		print("awake \(msg)")
		active -= 1
	}
}

and to run it:

    static func main() async
	{
		do
		{
			let actor = MyActor()
			let taskA = Task { try await actor.test("foo", sleep: 3) }
			let taskB = Task { try await actor.test("bar") }
			try await Task.sleep(nanoseconds: 1_000_000_0 * 5) //otherwise migrate run first
			let taskM = Task { try await actor.migrate()   }   //should wait for A and B
			let taskC = Task { try await actor.test("food") }  //should not run
			let taskD = Task { try await actor.test("bars") }  //should not run
			print("done")
			try await Task.sleep(nanoseconds: 1_000_000_000 * 5)
		}
		catch { print(error) }
	}

There's no existing pattern for this kind of "wait", other than the pseudo-wait via Task.sleep. I think the forthcoming Task.select might be helpful for this kind of thing, in the minimal case of suspending on a single outstanding task.

The only current solutions AFAIK are: use one of the with…Continuation functions at the deepest level of waiting, essentially bridging from the old completion-handler pattern back to Swift concurrency; or, use a custom AsyncStream to deliver completions of events you're waiting on.

You do face a second problem, though. Using await inside an actor is very bad news because actors are re-entrant. At the suspension point of such an await, other actor-isolated functions and accessors can run, so the awaiting function is not completely isolated as you might expect. This is fine if your actor has no breakable invariants that can be compromised across an internal suspension point, but that's very hard to ensure — and it pretty much takes away the isolation advantages that actors were intended to provide.

Proceed with extreme caution. :slight_smile:

3 Likes

What you are describing sounds very similar to a concurrent DispatchQueue with a barrier work item. I don't think it is a good idea to mix DispatchQueues with modern concurrency so I have created a modern concurrency equivalent. I have also created a sample test that demonstrates your scenario.

To implement this functionality on your own you need to track two things basically how many active transactions are running and suspend upcoming update database schema task if your transactions task count greater than zero. Also, you need to keep track if any migration is running and suspend your incoming transactions until migration is complete. You can use continuations to suspend any task and store the continuation to be resumed/cancelled later.

Sorry I haven’t responded thus far.
First of all, thank you and QuinceyMorris for the replies.

I got something that seems to work fairly quickly after QuinceyMorris reply. I’ll study your solution in detail too,

It’s summer time and I have less available free time unfortunately. I’ve also been working on non-concurrency related issues. Currently I’m figuring out how to best deal with the schema and type safety.. The DB stores everything as enum values and strings with a strict order while the graph model uses types, CodingKeys, etc. There is more friction than I like between those worlds. So I’m refactoring while treating it as a purely local database before adding the rest back to it.

Ah well, once 5.7 is released and I have refactored it to use the cool new features, then perhaps I’ll post about my progress and experiences thus far.
It’s an interesting project and I’m still at it.