Actor Reentrancy

For anyone who finds this later, this is what I ended up using as my solution in this space from the advice given here.

Still worth some analysis from more experienced parties, but so far as I can tell this should still allow Task/thread suspension and continuation such that no thread gets tied up during lock contention.

import Collections

public actor AsyncSemaphore {
    private var count: Int
    private var waiters: Deque<CheckedContinuation<Void, Never>> = []

    public init(count: Int = 1) {
        self.count = count
    }

    public func wait() async {
        count -= 1
        if count >= 0 { return }
        await withCheckedContinuation {
            waiters.append($0)
        }
    }

    public func signal() {
        self.count += 1
        guard let first = waiters.popFirst() else { return }
        first.resume()
    }
}

public actor SharedState<T> {
    private var sem = AsyncSemaphore(count: 1)
    public var state: T
    public init(_ state: T) {
        self.state = state
    }

    public func access(_ closure: (isolated SharedState) async throws -> Void) async rethrows {
        await sem.wait()
        defer { Task { await sem.signal() } }
        try await closure(self)
    }
}

public extension AsyncSemaphore {
    func withLock<T>(_ closure: () async throws -> T) async rethrows -> T {
        await wait()
        defer { signal() }
        return try await closure()
    }

    func withLockVoid(_ closure: () async throws -> Void) async rethrows {
        await wait()
        defer { signal() }
        try await closure()
    }
}
4 Likes