Actor Reentrancy

I am reading the Actors Proposal (SE-0306), and I am totally confused :confused:

I have no problem understanding this:

Actor isolation

The second form of permissible cross-actor reference is one that is performed with an asynchronous function invocation. Such asynchronous function invocations are turned into "messages" requesting that the actor execute the corresponding task when it can safely do so. These messages are stored in the actor's "mailbox", and the caller initiating the asynchronous function invocation may be suspended until the actor is able to process the corresponding message in its mailbox. An actor processes the messages in its mailbox sequentially, so that a given actor will never have two concurrently-executing tasks running actor-isolated code. This ensures that there are no data races on actor-isolated mutable state, because there is no concurrency in any code that can access actor-isolated state.

But my confusion starts after reading this:

Actor reentrancy

Actor-isolated functions are reentrant. When an actor-isolated function suspends, reentrancy allows other work to execute on the actor before the original actor-isolated function resumes, which we refer to as interleaving.
...

Since an actor processes messages in its input queue sequentially and one at a time, how can other work be executed on the same actor?

I feel that I have missed something. :confused:

Could anyone unconfuse me please?

That's actually very easy:

actor Foo {
  var state = 0

  func work() async {
    // assume it's still 0 at this time
    print(state)

    // suspension point of `work`
    await doSomeLongWork() 

    // this is no longer guaranteed to be `0` at this point
    print(state) 
  }
  
  func setState(to newValue: Int) {
    state = newValue
  }
}

As far as I can tell an actor only guarantees that it executes one Task at a time, but if the task gets suspended, it's an opportunity for other tasks to run, regardless if the previous task was finished or not. The actor only needs to guaranteed that no task run in parallel. If we would always wait until the active but suspended task would eventually finish we would have a great chance eventually hitting deadlocks. That said when doSomeLongWork is suspended, another task can access the actor and potentially use setState(to:) to mutate the value. Hence when doSomeLongWork resumes, state is no longer guaranteed to have the same value as before the previous suspension point.

Feel free to correct me or the terminology I used in case there is something wrong with that. ;)

12 Likes

More than the possibility that someone will call setState() while work() is suspended, it's also possible for something to invoke work() again while work() is suspended.

9 Likes

Following is my understanding:

  • Synchronous functions on an actor run synchronously
  • Asynchronous functions on an actor can suspend (when encountering await) which gives an opportunity for other asynchronous functions (of the same actor or others) to run

Code

actor A {
    func f1() {
        print("f1 started")
        for _ in 1..<4_000_000 {}
        print("f1 ended")
    }
    
    func f2() async {
        print("f2 started")
        try? await Task.sleep(nanoseconds: 4_000_000_000)
        print("f2 ended")
    }

    func f3() async {
        print("f3 started")
        try? await Task.sleep(nanoseconds: 4_000_000_000)
        print("f3 ended")
    }
}

let a = A()

Task.detached {
    await a.f1()
}

Task.detached {
    await a.f2()
}

Task.detached {
    await a.f3()
}

RunLoop.main.run()

Output

f1 started
f1 ended
f2 started
f3 started
f3 ended
f2 ended
2 Likes

Great explanations so far, perhaps allow me to address one thing specifically:

Well, because a "message" in this case it not necessarily an entire function. In case of an asynchronous function, it's basically the blocks separated by awaits. In @DevAndArtist's example the work function basically becomes two "messages". And I think it is important to understand that the second "message" is only "arriving" in the actor once the await doSomeLongWork() is done.
So if the first message (the first print(state)) is done the actor is effectively idle. Any other of its functions (including work) can be called, thus "re-entering" the actor again, resulting in it processing another "message".
Then, eventually, doSomeLongWork returns, giving it yet another "message" to handle, this time for the second print(state).

I guess the confusion stems from comparing this with "the olden ways if DispatchQueue": The two "messages" of work are not immediately enqueued into something that matches a serial DispatchQueue. If that were the case, of course you would not be able to call setState and have it affect the property before the second part of work is done, but that would also block the entire actor until doSomeLongWork is returning.
Instead it works as I described above, or in other words: The compiler is way smarter when it comes to compiling async functions and interpreting the await.

I'm not sure why you're saying that, dispatch queues have the exact same behavior in that regard, that is:

func work() {
    queue.async {
        // first piece of work
        doSomethingAsynchronously(callBackOn: queue) {
            // second piece of work
        }
    }
}

work() can be called in a reentrent fashion before resuming from doSomethingAsynchronously() and executing the second piece of work.

Ah, of course that one is reentrant, but I meant this differently:
I think that a lot of people who are only familiar with dispatch queues would interpret the explanations about actors, isolation, and the "messages" such that isolated async functions are "split" into "messages" and enqueued when the method is called.

So this:

func work() async {
    print("First print: \(state)")
    await doSomeLongWork() 
    print("Second print: \(state)") 
}

would become this:

func work() {
    // as the "mailbox" processes messages sequentially an actor would have a serial queue:
    theActorsSerialQueue.async {
        print("First print: \(state)")
        doSomeLongWork()
    }
    theActorsSerialQueue.async {
        print("Second print: \(state)")
    }
}

Of course work is technically still reentrant, but since both "messages" it was "split into" have already been queued onto a serial queue there's no way the second call's first part can be executed before the first call's second part is done.

I think (?) a better way to think of the actor isolation and async/await mechanic if you want to mentally "wrap" it into queue-like code would be this (which is exactly what you wrote, but again, my hunch is that people don't think it actors and the way await is implemented works):

func work() {
    theActorsSerialQueue.async {
        print("First print: \(state)")
        doSomeLongWorkNowWithCallback() {
            theActorsSerialQueue.async {
                print("Second print: \(state)")
            }
    }
}

This way, reentrant calls to work can be processed before the second "message" is queued onto the actor's serial isolation queue.
But my hunch is that on first read people think it would do it in the first way.
"Re-entrant" in this context then is less of a way to express that you can call any function repeatedly to schedule it several times, but more that the actor's scheduling "queue" is able to "re-enter" at various points (if you "schedule" async functions, it gets more "re-entry-opportunities" than just at the function call itself).

This is only true if work() cannot be called concurrently. Otherwise you still have a possible execution in-between the two parts.

Sure, but that is a whole other can of worms. I just wanted to describe what I believe people who are more familiar with dispatch queues interpret into the more modern syntax of actors and await/async. Of course since work itself is defined on an actor you get this trap prevented "for free", but this, I think, people understand quickly and don't "translate" it into queue-code in their minds.

I wasn't trying to rewrite the entire exact same thing using dispatch queues here, just trying to highlight the differences.

I'm curious what some more experienced people think of the solution I threw together for this issue.

My goal was to make a generic structure that I could wrap around anything and provide async usable mutexing without needlessly spin-locking waiting threads.

My hope is that in the absolute worst case, the below code will only spin-lock 1 thread/actor (the ActorSemaphore). The remaining threads/actors will merely be suspended and allowed to do other Tasks.

Again, I'm hoping someone can poke some holes in my assumptions if there's something I'm not understanding.

import Foundation

// Spin up a bunch of concurrent accesses
// to contend on the shared state.
let state = SharedState(0)
await withTaskGroup(of: Void.self) {
    for _ in 0..<10000 {
        $0.addTask {
            await state.access({
                print("Access: pre-increment")
                $0.state = await delayedIncrement($0.state)
                print("Access: post-increment")
            })
        }
    }
    await $0.waitForAll()
}
await state.access({
    print($0.state) // Should be `10000`
})

func delayedIncrement(_ n: Int) async -> Int {
    print("Incrementing \(n)")
    // Uncomment to simulate an actual delay
//    await Thread.sleep(forTimeInterval: 1)
    return n + 1
}

// MARK: Relevant Data Structures

actor ActorSemaphore {
    private let sem: DispatchSemaphore

    init(value: Int) {
        sem = DispatchSemaphore(value: value)
    }

    // Default isolation so that only one thread
    // ever runs this work.
    // All `wait` tasks get enqueued on one thread
    // so that other threads can suspend and do
    // other work.
    func wait() {
        print("Sem:wait:pre: ", Thread.current)
        sem.wait()
        print("Sem:wait:post: ", Thread.current)
    }

    // Nonisolated so that ANY thread can release
    // the lock.
    nonisolated func signal() {
        print("Sem:sig:pre: ", Thread.current)
        sem.signal()
        print("Sem:sig:post: ", Thread.current)
    }
}

actor SharedState<T> {
    private var sem = ActorSemaphore(value: 1)
    public var state: T
    public init(_ state: T) {
        self.state = state
    }

    // We artificially isolate `SharedState` using the `ActorSemaphore`
    // Even under re-entrant conditions, the re-entering actor will
    // suspend behind the `ActorSemaphore` actor.
    public func access(_ closure: (isolated SharedState) async throws -> Void) async rethrows {
        await sem.wait()
        print("Pre-closure: ", Thread.current)
        defer {
            print("Post-closure(\(state)): ", Thread.current)
            sem.signal()
        }
        try await closure(self)
    }
}

I might be wrong now, but using a DispatchSemaphore in an actor has proved to be problematic in the past when I was trying to do something similar.

I think the work around was to use continuations instead. (When I find the example code, I will add it here.)

@John_McCall, could you shed some light on this to educate us please?

Code Using Continuations
// Channel.swift

extension V3 {
    actor Channel<T> {
        private var store   = [T?] ()
        private var waiters = [UnsafeContinuation <T?, Never>] ()

        func put (_ u: T?) {
            store.append(u)
            if ! waiters.isEmpty {
                waiters.removeFirst().resume (returning: store.removeFirst ())
            }
        }
        
        func get () async -> T? {
            if store.isEmpty {
                return await withUnsafeContinuation {
                    waiters.append ($0)
                }
            }
            else {
                let value = store.removeFirst()
                return value
            }
        }
        
        var state: (packets: Int, waiters: Int) {
            return (store.count, waiters.count)
        }
        
        var description: String {
            return "\(state)"
        }
    }
}

Original Example

Source

actor Semaphore {
    private var count: Int
    private var waiters: [CheckedContinuation<Void, Never>] = []

    init(count: Int = 0) {
        self.count = count
    }

    func wait() async {
        count -= 1
        if count >= 0 { return }
        await withCheckedContinuation {
            waiters.append($0)
        }        
    }

    func release(count: Int = 1) {
        assert(count >= 1)
        self.count += count
        for _ in 0..<count {
            if waiters.isEmpty { return }
            waiters.removeFirst().resume()
        }
    }
}
1 Like

You definitely should not be blocking threads on arbitrary future work like this, no.

3 Likes

I think the hole is that, while you may be getting away with it in your example here by blocking one thread, you also plan to use this type in many places across your codebase which would be the surest way to starve the thread pool and potentially deadlock the program.

3 Likes

If you want to use a semaphore in order to prevent reentrancy, but not a semaphore that blocks threads, you can use one that is tailored for Swift concurrency, such as groue/Semaphore. The usage is the same:

func someMethod() {
    await sem.wait()
    defer { sem.signal() }

    // Now do work in a non-reentrant way
}
3 Likes

I hadn't considered using continuations directly to get the functionality I needed - oversight on my part.
This definitely seems like the most elegant path to take though.
Thanks for this!

For anyone who finds this later, this is what I ended up using as my solution in this space from the advice given here.

Still worth some analysis from more experienced parties, but so far as I can tell this should still allow Task/thread suspension and continuation such that no thread gets tied up during lock contention.

import Collections

public actor AsyncSemaphore {
    private var count: Int
    private var waiters: Deque<CheckedContinuation<Void, Never>> = []

    public init(count: Int = 1) {
        self.count = count
    }

    public func wait() async {
        count -= 1
        if count >= 0 { return }
        await withCheckedContinuation {
            waiters.append($0)
        }
    }

    public func signal() {
        self.count += 1
        guard let first = waiters.popFirst() else { return }
        first.resume()
    }
}

public actor SharedState<T> {
    private var sem = AsyncSemaphore(count: 1)
    public var state: T
    public init(_ state: T) {
        self.state = state
    }

    public func access(_ closure: (isolated SharedState) async throws -> Void) async rethrows {
        await sem.wait()
        defer { Task { await sem.signal() } }
        try await closure(self)
    }
}

public extension AsyncSemaphore {
    func withLock<T>(_ closure: () async throws -> T) async rethrows -> T {
        await wait()
        defer { signal() }
        return try await closure()
    }

    func withLockVoid(_ closure: () async throws -> Void) async rethrows {
        await wait()
        defer { signal() }
        try await closure()
    }
}
2 Likes