Actor Races

I wonder if this workaround can help, if not - why? Syntax wise it can be made looking exactly like dispatch async call:

func downloadAndStore() async -> Result {
    await queue.async {
         let download = await loadWebResource("data.txt")
         await database.store(download)
    }
}

Being just one level deep it's hardly a "pyramid" of doom.

I think that would be 'an Actor in name only'. AFAICT there would be no functional difference if this was defined on a class or an actor – so may as well use a class.

In the 'perfect' case where an actor is protecting some simple resource, I wouldn't expect any of its methods to be annotated async, but all callers must mark their calls with await regardless, as the Actor guarantees that only one callee is operating on itself at any one time.

The trouble is if, at any point, the actor suspends a task it's running by calling another Actor or resource with some kind of asynchronous API, that could lead to 'interleaving'.

A workaround to this, might be a semaphore along the lines of:

func downloadAndStore() -> Result {
    let semaphore = DispatchSemaphore(value: 0)
    var result: Result!
    Task {
        let download = await loadWebResource("data.txt")
        result = await database.store(download)
        semaphore.signal()
    }
    semaphore.wait()
    return result
}

(Or something along those lines, haven't tested.)

This essentially blocks the Task at the wait call until the nested task's 'signal'. But I'm not sure if this breaks the 'contract' that says: a Swift Task is not supposed to block (async worker threads must always make forward progress).

1 Like

Yeah, if you use a semaphore like that 4 times on a 4 core device, it'll block all other actors/tasks (except the main one I believe) until it's done. If the things you're waiting for need to use Swift concurrency themselves to finish, then it could deadlock, since they won't be able to get a thread to run on. (Concretely, I've seen this happen with libdispatch+XPC, where the XPC reply handler never got to run)

7 Likes

Ouch. Thanks for letting me know. What is the recommended way to handle this then? GCD?

A queue is a reasonable option, yeah (they also have thread limits but they're much higher). Once we have custom executors they may be applicable here. It's also probably reasonable for most apps to block one Swift thread at a time, so having a singleton actor you run blocking stuff on isn't the worst thing in the world.

3 Likes

Perhaps not as was written, but in this example:

var cache: [String: Result] = [:]
...
func downloadAndStore(key: String) async -> Result {
    if let result = cache[key] {
        return result
    }
    await queue.async {
         let download = await loadWebResource(key)
         await database.store(download)
         cache[key] = result
    }
}

the difference compared to class would be automatic protection of "cache" variable. The class version would have to protect it with mutex or some other means.

One can define a semaphore with actors.

In the sample code below, a Semaphore is created with the number of jobs in can run concurrently (one, two, one hundred, just as many as you want).

To run a job, you can call:

// One concurrent job
let semaphore = Semaphore(value: 1)

Task {
    // wait for a slot
    await semaphore.wait()
    // run
    await <your async function>
    // signal end of job
    await semaphore.signal()
}

As a convenience, you can write:

// One concurrent job
let semaphore = Semaphore(value: 1)

Task {
    await semaphore.run (
        await <your async function>
    }
}

See a full sample code below (this a quick draft, I'm not sure this is 100% reliable):

public actor Semaphore {
    private var value: Int
    private var continuations: [UnsafeContinuation<Void, Never>] = []

    init(value: Int) {
        self.value = value
    }

    /// Waits for, or decrements, a semaphore.
    ///
    /// Decrement the counting semaphore. If the resulting value is less than
    /// zero, this function waits for a signal to occur before returning.
    public func wait() async {
        if value <= 0 {
            await withUnsafeContinuation { continuation in
                continuations.insert(continuation, at: 0)
            }
        }
        assert(value > 0)
        value -= 1
    }

    /// Signals (increments) a semaphore.
    ///
    /// Increment the counting semaphore. If the previous value was less than
    /// zero, this function wakes a task currently waiting in ``wait()``.
    @discardableResult
    public func signal() -> Int {
        value += 1
        if value > 0, let continuation = continuations.popLast() {
            continuation.resume()
        }
        return value
    }

    /// Convenience method that waits, run an async function, and signals.
    public func run<T>(execute: @Sendable @escaping () async throws -> T) async rethrows -> T {
        await wait()
        defer { signal() }
        return try await execute()
    }
}

// Demo

Task {
    do {
        print("Start, 1 concurrent task")
        let semaphore = Semaphore(value: 1)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
    
    do {
        print("Start, 2 concurrent tasks")
        let semaphore = Semaphore(value: 2)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
    
    do {
        print("Start, 100 concurrent tasks")
        let semaphore = Semaphore(value: 100)
        await withThrowingTaskGroup(of: Void.self) { group in
            for i in 0..<10 {
                group.addTask {
                    try await semaphore.run {
                        print("Start task \(i)")
                        try await Task.sleep(nanoseconds: 1_000_000_000)
                        print("End task \(i)")
                    }
                }
            }
        }
        print("Done")
        await semaphore.run {
            print("One last because it's fun")
        }
    }
}
6 Likes

Along these lines, maybe the best current solution is just to use another actor as a TaskQueue. Like this

actor Queue {
    private var tasks: [() async -> Void] = []
    
    func enqueue(_ task: @escaping () async -> Void) {
        tasks.append(task)
        Task { await runNext() }
    }
    
    private func runNext() async {
        guard !tasks.isEmpty else { return }
        let task = tasks.removeFirst()
        await task()
        if !tasks.isEmpty {
            Task { await runNext() }
        }
    }
}


actor Downloader {
    private let queue = Queue()
    
    func downloadAndStore() async {
        await queue.enqueue { [self] in
            let data = await download()
            await store(data)
        }
    }
    
    private func download() async -> Data { print("downloading") ; return Data() }
    private func store(_ data: Data) async { print("storing") }
}


let downloader = Downloader()
await downloader.downloadAndStore()

Presumably downloadAndStore is now completed as a transaction, and this solves my problem.

Anyone see a problem with this?

2 Likes

I hear mention of custom executors on several threads but there is nothing on the swift evolution and there has not been any activity on this pitch either since year+ ago.
Are there any blocking factors or is this feature not considered a high priority?

2 Likes

Interesting... how would you use that in the context of the example above (added below) with the old Dispatch Semaphore though? It requires an async context, right?

actor SomeActor {
...
    func downloadAndStore() -> Result {
        let semaphore = DispatchSemaphore(value: 0)
        var result: Result!
        Task {
            let download = await loadWebResource("data.txt")
            result = await database.store(download)
            semaphore.signal()
        }
        semaphore.wait()
        return result
    }
}

That looks interesting, I guess Downloader no longer needs to be an Actor, though? Or is there some other state being managed?

True. Downloader could be a class, and the code would then be very similar to yours, with an actor queue instead of dispatch.

My original problem was having actor functions be transactional, and this seems like a way to do it.

1 Like

Do you want to serialize downloads?

let downloadSemaphore = Semaphore(value: 1)
func downloadAndStore() async -> Result {
    let download = await downloadSemaphore.run {
        await loadWebResource("data.txt")
    }
    return await database.store(download)
}

Do you want to serialize downloads and serialize db accesses?

let downloadSemaphore = Semaphore(value: 1)
let databaseSemaphore = Semaphore(value: 1)
func downloadAndStore() async -> Result {
    let download = await downloadSemaphore.run {
        await loadWebResource("data.txt")
    }
    return await databaseSemaphore.run {
        database.store(download)
    }
}

Do you want to serialize (download + db access) ?

let semaphore = Semaphore(value: 1)
func downloadAndStore() async -> Result {
    await semaphore.run {
        let download = await loadWebResource("data.txt")
        return database.store(download)
    }
}

No thread is ever blocked. You choose what you want to serialize.

If ever the network can accept more than one concurrent download, you just raise the initial value of the network semaphore.

EDIT: a Semaphore(value: 1) is close to a queue. But beware: it does not provide any FIFO guarantee. As said above in the thread, FIFO requires streams, async sequences, some complex setup.

4 Likes

Just chiming in here to add my support. async/await are great. Actors are great. But we need something better for Actors that interact with other external systems.

I ran into this issue a few months ago and promised @ktoso on the Vapor Discord that I would mention it here.

I think it's telling that the illustrating example that Apple uses to motivate Actors -- controlling writes to a database -- doesn't actually work if you care about keeping the state in the DB consistent with the in-memory version.

For my part, I kludged together a task queue -type thing where my Actor processes one request at a time and the others await on it. I think my design should work, but I haven't put it into production yet because it's tricky code and TBH it's not critical right now. But this is exactly the kind of thing that advanced language features like Actors were intended to help with. Right?

Also it feels pretty weird to be writing something that is so clearly in conflict with the spirit (if not exactly the letter) of the rules for how Actor code should behave. But what choice do we have?

3 Likes

Got it, that's interesting. It has a similar pattern to @drewmccormack 's Queue where the Actor is used as a simple primitive to serialise access to some resource, similar to the old GCD primitives, whereas I think everyone's intuition had been to contain their protected resource inside some Actor.

Actors only protect their inner state between two awaits. When we want to serialize full asynchronous jobs (or to limit the number of concurrent jobs) that contain await (like your example involving network + database access), then actors are no longer relevant, and it is important to see it and understand it. Actors are not the solution for this kind of concurrency requirement.

The Semaphore class above is indeed 100% inspired from DispatchSemaphore - I just implemented the documentation of DispatchSemaphore apis, and added the convenience run method. Unlike its GCD big brother, it does not block threads, it just awaits.

3 Likes

Yeah – this is great. Going to play around with this and see if I can get some sort of FIFO guarantee with the ideas from this and Drew's queue.

Sounds like an interesting exploration. Maybe make sure you do indeed need the fifo guarantee (what breaks in your app if you don't have it?), and remember what a member of the Core Team said upthread about fifos: Actor Races - #28 by John_McCall

It's mainly curiosity. But as a previous example I've dealt with: The iOS Camera APIs recommend that you start and stop the capture session on a background queue as the configuration blocks and can take a while. So, you end up doing something along the lines of startCamera() on didBecomeActive and stopCamera() on didResignActive.

Now, if I'm understanding correctly, some quick task switching could mean that, if a start, stop, start, is issued in close succession, I could actually get a start, start, stop and the user is left looking and a blank camera feed.

If ever you find the Semaphore actor useful, I've updated the sample code above for more robustness. You can now create a semaphore with a negative value (which needs as many signals until tasks waiting in wait() can continue).

EDIT: :sweat_smile: A semaphore should not be initialized with a negative value. The sample code really needs to be reviewed!

EDIT 2: withUnsafeContinuation is a nasty painful little immature method that yields before its block is run and is painful to use correctly (well it is documented to do so). The above sample code does not work reliably in its current state. And it is surprisingly painful to come up with a robust implementation.

EDIT 3: AsyncStream is a failed attempt at replacing unsafe continuations. Fatal error: attempt to await next() on more than one task, lol. The pain continues.

EDIT 4: Let's try with AsyncChannel? Well, it does not even compile (see screenshot). I've rarely used a more frustrating api, I give up :man_shrugging:. @Philippe_Hausler, if you have time, I'm curious about your way to implement a usable counting semaphore with Swift concurrency.

3 Likes