Making actor non reentrant

A possible way is to setup a stream that enqueues calls to foo, as below:

actor MyActor {
    private struct QueueRequest {
        var continuation: CheckedContinuation<Void, Never>
        var job: () async -> Void
    }

    private let queueContinuation: AsyncStream<QueueRequest>.Continuation!
    private let queueTask: Task<Void, Never>

    init() {
        // Prepare the queue of foo() invocations
        let (stream, queueContinuation) = AsyncStream.makeStream(of: QueueRequest.self)
        self.queueContinuation = queueContinuation
        self.queueTask = Task {
            for await request in stream {
                await request.job()
                request.continuation.resume()
            }
        }
    }

    deinit {
        queueTask.cancel()
    }

    func foo() async {
        await withCheckedContinuation { continuation in
            let request = QueueRequest(continuation: continuation) {
                await self.nonReentrantFoo()
            }
            queueContinuation.yield(request)
        }
    }

    // This function is not reentrant
    private func nonReentrantFoo() async {
        print("Start foo")
        try? await Task.sleep(for: .seconds(1))
        print("End foo")
    }
}
Quick check
final class MyActor Tests: XCTestCase {
    func testFooIsNotReentrant() async throws {
        let a = MyActor()

        // Prints:
        // - Start foo
        // - End foo
        // - Start foo
        // - End foo
        await withTaskGroup(of: Void.self) { group in
            group.addTask {
                await a.foo()
            }
            group.addTask {
                await a.foo()
            }
        }
    }
}

You can also use a Semaphore (there are several ones online, including mine), but take care of priority inversion:

import Semaphore

actor MyActor {
    private let fooSemaphore = AsyncSemaphore(value: 1)

    func foo() async {
        await fooSemaphore.wait()
        defer { fooSemaphore.signal() }

        // The semaphore makes sure the code below
        // is never called in a reentrant way.
        print("Start foo")
        try? await Task.sleep(for: .seconds(1))
        print("End foo")
    }
}

And see also ConcurrencyRecipes/Recipes/Structured.md at main · mattmassicotte/ConcurrencyRecipes · GitHub

8 Likes