Actor Races

I just stumbled onto this thread, but it touches on so many things I've also struggled with I had to chime in.

While migrating my app from Operation to async/await, I ran it very similar problems with queuing. My implementation of a TaskQueue is here:

5 Likes

Playing around with some of the ideas in this thread, I've come up with a solution for FIFO access which seems to play quite nicely. Not fully tested but the general idea is:

public final class AsyncSerialResource<Resource> {
    
    public init(_ resource: Resource)
    public func perform(_ action: @escaping (Resource) async -> Void)    
}

And you use it like this:

let serialResources = AsyncSerialResource(
    Resources(
        imageDownloader: ImageDownloader(),
        imageCache: ImageCache()
    )
)

for i in 0..<5 {
    protectedResource.perform { resources in
        await resources.imageDownloader.getImage(forID: i)
        await resources.imageCache.setImage(image, forID: i)
    }
}

It's also possible to add blocking 'performAndWait' style calls:

public func performAndWait<T: Sendable>(_ action: @escaping (Resource) async -> T) async -> T    
public func throwingPerformAndWait<T: Sendable>(
    _ action: @escaping (Resource) async throws -> T
) async throws -> T

But as the caller would be spawning a task to use them, I'm not sure how useful they are in practice.

Implementation:

public final class AsyncSerialResource<Resource> {
    
    private let queuedActions: AsyncStream<() async -> Void>.Continuation
    private let resource: Resource
    
    public init(_ resource: Resource) {
        self.resource = resource
        let (stream, continuation)
            = AsyncStream<() async -> Void>.withExtractedContinuation((() async -> Void).self)
        self.queuedActions = continuation
        Task {
            for await action in stream {
                await action()
            }
        }
    }
    
    public func perform(_ action: @escaping (Resource) async -> Void) {
        queuedActions.yield({ [resource] in await action(resource) })
    }
    
    public func performAndWait<T: Sendable>(_ action: @escaping (Resource) async -> T) async -> T {
        let (stream, continuation) = AsyncStream<T>.withExtractedContinuation(T.self)
        queuedActions.yield { [resource] in
            let result = await action(resource)
            continuation.yield(result)
        }
        let result = await stream.first { _ in true }!
        return result
    }
    
    public func throwingPerformAndWait<T: Sendable>(
        _ action: @escaping (Resource) async throws -> T
    ) async throws -> T {
        let (stream, continuation)
            = AsyncStream<Result<T, Error>>.withExtractedContinuation(Result<T, Error>.self)
        queuedActions.yield { [resource] in
            do {
                let result = try await action(resource)
                continuation.yield(.success(result))
            }
            catch let error {
                continuation.yield(.failure(error))
            }
        }
        let result = await stream.first { _ in true }!
        return try result.get()
    }
}

fileprivate extension AsyncStream {
    
    static func withExtractedContinuation<T: Sendable>(
        _ type: T.Type
    ) -> (AsyncStream<T>, AsyncStream<T>.Continuation) {
        var extractedContinuation: AsyncStream<T>.Continuation! = nil
        let stream = AsyncStream<T>(T.self, bufferingPolicy: .unbounded) { continuation in
            extractedContinuation = continuation
        }
        return (stream, extractedContinuation)
    }
}
2 Likes

I think this is the deepest question here. There's a potential language design that I've tossed around a bit as a sort of dual to the actor design, which I've taken to calling reactors. A reactor is basically what you've got as AsyncSerialResource: internally there's a task, and method calls on the reactor always enqueue something to execute on the task, and that queue is processed FIFO. Since method calls are just enqueues, they can't return anything, and the "call" cannot suspend and therefore does not need to be awaited, even if the method is declared async (dual to how actor calls can suspend and therefore need to be awaited even if the method is not async). Operations on the reactor that suspend for a long time will hold up progress on the reactor, but since calls to the reactor never block the caller, this doesn't itself admit deadlocks into the system.

Reactors do raise some thorny questions about structured concurrency, and those tie in pretty closely with the question of whether you can await the completion of something on a reactor. If a reactor is really an independent system that's just reacting to events, then its existence doesn't undermine structured concurrency in any way, because the work of that reaction ought to be detached from whatever caused it — the work is not on behalf of the caller's task. In that case, it's fine that information can't flow back to the caller, that the caller's priority doesn't propagate to the reaction, that the reaction isn't cancelled by cancelling the calling task, and so on. But if the reactor's operation is really part of the original operation, then it ought to run as part of the caller's task, and it very plausibly needs information to be able to flow back to the caller. And if you try to fake that up by, say, passing a completion handler to a reactor method (and maybe even having that completion handler resume a continuation), then you really are completely subverting structured concurrency, as well as readmitting deadlocks to the system.

So the name "reactor" is architecturally significant here, and this kind of design shouldn't be used for something that doesn't have the characteristics of a detached reaction. I'm very curious to see if that's okay and if it still solves problems for people under that constraint.

12 Likes

I guess what may confuse people, is that actor, at least as defined in the most popular actor systems (such as erlang), uses the metaphor of message sending and mailbox, which, by default, work like a fifo queue.
It's going to be a very long road to have people realize that swift actors, despite using the very same concept name, make the most fundamental expectation about function call behavior wrong.

3 Likes

That has recently been changed to no longer have that behavior; it now is consuming (which may not be what you want to do here for a semaphore-ish behavior).

Those compile failures look like it is using the wrong version of Xcode. To build that project you need to have the most recent version to build it; because it uses features that are not in previous versions (specifically Clock et al).

AsyncChannel has perhaps that "wait from both sides" type thing that you would want. The rub of course for reducing this down is that the code to do that is kinda tricky. Effectively you need to have a continuation that produces a continuation. Which that ends up being a very gnarly state machine to do that. Don't get me wrong... it isn't impossible... but it is definitely a mind-bender.

If I get some spare time to noodle about how to implement that I will pop back here with an example. I may actually have something in my sketches folder (because it was similar to some of the testing requirements for the async-algos package).

Got it. That makes a lot of sense and confirms my understanding.

I think the idea of a Reactor where calls don't and can't suspend, is definitely useful in itself. However, it would be even more useful with the dual of working out how information can be returned back to the callee.

Dare I say it, but in something like Elm, you have the concept of 'commands' that are issued as the output of a reducer to go off and perform side-effects asynchronously – this seems kind of analogous to that. The way they handle getting data back to the callee is that, optionally, a callback can be bundled with the 'command' that causes another run through the reducer with the result. Maybe there's a Swift like way we can achieve this without the callbacks.

Thinking out loud: perhaps something along the lines of send<T: Sendable>(_ action: @escaping (Resource) async -> T, receive: @escaping (T) -> Void) If it also blocked the Reactor from performing any more tasks until called, would it effectively create a linear contract between caller and callee?

EDIT1: add missing receive param on send method signature.
EDIT2: Actually, it doesn't need to be a callback I don't think.

That certainly sounds like a linear contract that would be more naturally expressed as an awaited call.

I found a small sketch of something that kinda does what you might be interested in; this was an early design in some of the algorithms (and I could see where perhaps it might be something that could be massaged into a more production worthy concept; tbh the name is kinda awful, but I did update it to use the new shiny locking stuff added in most recently)

I may be missing something, but would an awaited call not require wrapping in a Task, and therefore our order isn't guaranteed and we're back at square one?

I don't know if this is a good example or not, but in the case of say:

final class SomeObservable: ObservableObject {
    
    @Published var latestImage: Image?
    @Published var latestSerialImage: Image?
    
    private let loader = ImageLoader()
    private let protectedLoader = AsyncSerialResource(ImageLoader())
    
    func nextImage() {
        Task {
            let image = await loader.getNextImageOverNetwork()
            self.latestImage = image
        }
    }
    
    func nextSerialImage() {
        protectedLoader.send { loader in
            await loader.getNextImageOverNetwork()
        } receive: { image in
            self.latestSerialImage = image
        }
    }
}

actor ImageLoader {
   // some long image load process
    func getNextImageOverNetwork() async -> Image {  ... }
}

Calling nextImage may result in an older image overwriting a newer image on the latestImage property, whereas calling nextSerialImage would guarantee they arrive in order on the latestSerialImage property. (I think.)

Am I missing something obvious? Is there a better way to express this?

I’m just saying that if you’re doing work on behalf of something that you communicate back to, I want you to be using structured concurrency.

That doesn’t mean you can’t end up with more-or-less cyclic lines of communication with reactors that send events to other reactors, just that if you think of it as returning a result to the caller, that should probably be structured.

I've also had need for non-reentrant actors or (equivalently) async-friendly locks. Most of it would be addressed by the described reactor design, but sometimes you do need to make a call and wait for the result. I've generally hacked around the issue by awaiting on a separate task stream and using continuations (Substrate/TaskStream.swift at main · troughton/Substrate · GitHub; there's a bunch of unsafe code in there that's needed to get the correct semantics).

To give an example use case: consider a renderer that returns an image. The renderer uses needs to be use async calls in its implementation (e.g. to asynchronously load in assets). It also reuses prior rendered frames; that means there needs to be FIFO sequencing and shared state. I don't see a way to support that without preventing reentrancy.

In practice, the fact that the implementation could deadlock is a non-issue, because the async calls are isolated to different domains; the renderer will make its own async calls but never call back to code that could submit another render.

Right, so in this contrived example, having some AsyncSequence with its source within the ImageLoader that delivers the images serially. Yeah, definitely my preference too. Will see how I go. I feel I have a better understanding of it now, anyhow. Thanks!

1 Like

Thank you @Philippe_Hausler, I'll carefully look at your handling of unsafe continuations :+1:

OK, so maybe I have something useful here. It extends the idea of an AsyncSerialResource into something which allows responses to be sent back to the caller serially, all using structured concurrency. Using trendy parlance, it creates a unidirectional data-flow between caller and callee.

I borrowed the name Reactor as it seems to fit better. Here's the outline:

public final class AsyncReactor<Context, Output: Sendable>: AsyncSequence {
    public init(_ context: Context, outputType output: Output.Type)    
    public func perform( _ action: @Sendable @escaping (Context, @Sendable @escaping (Output) -> Void) async -> Void)
}

And, it is itself an AsyncSequence which allows two-way communication between caller and callee while maintaining a linear communication contract.

Here's a contrived example of how you might use it:

final class SomeObservable: ObservableObject {
    
    enum State: Sendable { case loading, loaded(Image), failure(Error?) }
    
    @Published private (set) var state = State.loading
    
    private let loader = AsyncReactor(ImageLoader(), outputType: State.self)
    
    init() {
        // All responses from the Reactor are received and processed serially as there's just one long
        // running task processing the output, and one long running task inside the Reactor processing
        // the input that is throttled by the throughput of the output processing task. This creates a
        // uni-directional flow of data.
        Task {
            for await reaction in loader {
                // I think we can even do some async thing here which would suspend execution in the
                // Reactor until we complete this cycle of the loop?
                await someAsyncThing()
                self.state = reaction
            }
        }
    }
    
    func someAsyncThing() async {}
    
    func nextSerialImage() {
        // calls to the Reactor are guaranteed to be called in serial order and without interlaving
        // between suspension points
        loader.perform { loader, send in
            // That means we can update the caller as we make forward progress and it should work
            // as you'd expect.
            await send(.loading)
            do {
                let image = try await loader.getNextImageOverNetwork()
                await send(.loaded(image))
            }
            catch let error {
                await send(.failure(error))
            }
        }
    }
}

// I'm not sure Image will end up being Sendable, but let's pretend for the sake of example
extension Image: @unchecked Sendable {}

The 'context' here holds one resource, but you could wrap multiple resources in a type (database, image cache, etc.) and pass that as the context, it would work as expected.

Implementation:


// For multicast support, we lean on the AsyncChannel type from Apple's open
// source AsyncAlgorithms package.
import AsyncAlgorithms

public final class AsyncReactor<Context, Output: Sendable> {
    
    private let context: Context
    private let sendInput: AsyncStream<() async -> Void>.Continuation
    private let outputChannel = AsyncChannel(element: Output.self)
    
    public init(_ context: Context, outputType output: Output.Type) {
        self.context = context
        let (inputStream, input)
            = AsyncStream<() async -> Void>.withExtractedContinuation((() async -> Void).self)
        self.sendInput = input
        Task {
            for await action in inputStream {
                await action()
            }
        }
    }
    
    public func perform(
        _ action: @Sendable @escaping (Context, @Sendable @escaping (Output) async -> Void) async -> Void
    ) {
        sendInput.yield { [context, outputChannel] in
            await action(context) { output in
                await outputChannel.send(output)
            }
        }
    }
}

extension AsyncReactor: AsyncSequence {
    
    public typealias AsyncIterator = AsyncChannel<Output>.Iterator
    public typealias Element = Output
    
    public func makeAsyncIterator() -> AsyncChannel<Output>.Iterator {
        outputChannel.makeAsyncIterator()
    }
}

fileprivate extension AsyncStream {
    
    static func withExtractedContinuation<T: Sendable>(
        _ type: T.Type
    ) -> (AsyncStream<T>, AsyncStream<T>.Continuation) {
        var extractedContinuation: AsyncStream<T>.Continuation! = nil
        let stream = AsyncStream<T>(T.self, bufferingPolicy: .unbounded) { continuation in
            extractedContinuation = continuation
        }
        return (stream, extractedContinuation)
    }
}

What's great is that with the multicast support from AsyncChannel, it should be viable to share the Reactor around without issues. When you're in a 'perform' block, you have exclusive access to the reactor context for the duration that the call is executing.

You could, for example, wrap it in an ObservableObject, and share it with multiple SwiftUI views all receiving updates and being able to perform operations serially without issue.

Chiming in here again now I have a better understanding:

Unfortunately I don't think this is guaranteed to run serially after all. Every time you create a task via the Task initialiser you're effectively creating a parallel computation. There's no guarantee which one will be plucked to be executed first, or finish first.

Same again here, the Task initialiser within the if block is the clue that we're about to spawn off a parallel computation that may or may not start – or finish – in order.

What does work is having a single Task that plucks off jobs from a serial AsyncSequence. And with some coaxing we can even get two Tasks communicating serially to provide two-way serial communication between caller and callee. I've included an example implementation of that in my post above.

The thing is, which tasks win here does not matter. The order that the Tasks run should not affect the outcome. The ordering of the queue is dictated by the array of async blocks (confusingly called tasks). That doesn't change. The tasks are just used to make sure the queue doesn't stall, by checking if there is still an async block to be executed, and doing them in FIFO order.

Yes, I see, you're absolutely right. Too much concurrency scrambles the brain!

Sorry, spoke too soon. Concurrency is hard. The issue is still there, it's just now been pushed up a level to the caller:

This is now the key part. Assuming you're calling this like so:

let queue = Queue()
Task { await queue.enque { /* some task */ } }

Then, its these tasks that are now running in parallel and of indeterminate order. They're ordered once they reach your Queue, it's just that we don't know what order they'll reach there. (Assuming you're using the API like above.)

1 Like

That is certainly true. But that wasn't really my objective to begin with. For me, the important part was that the function downloadAndStore should finish in its entirety before another call to it is handled. I wanted to avoid a race due to interleaving. I think the Queue does that, though it certainly doesn't make any guarantees about which submission runs first.

One would hope that submissions are handled roughly in order, which is basically what I was after.

It seems you are attempting to make something that guarantees FIFO order for the callers. That is a step further than what I was after, but a worthy goal.

2 Likes

Please excuse any silliness due to not fully comprehending the discussion to-date, but here goes...

It seems to me that, unless all callers are running on the same thread, there is never a guarantee as to submission order, for any queue. If it's important for a single call-site to issue two separate submissions, in a specific order, it would be up to that call site to nest them appropriately.

Task {
  await queue.enque { task1() }
  await queue.enque { task2() }
}

and not

Task { await queue.enque { task1() } }
Task { await queue.enque { task2() } }