Trying to implement basic concurrent code with actors

The actor and async/await introduced into Swift recently seem to be a good thing, but even after trying and searching quite intensively for solution I am still not able to produce a simple example command line program combining actors with concurrent code. And a good documentation of how to implement concurrent code seems to be missing.

I will give a little longer example below, which I think this is necessary to explain the problem.

At the end I will point out to another issue, that "async" methods so-to-say can "pollute" a whole code base with sync/await.

Of course, why a specific code is not correct or why the aforementioned "pollution" is logical is somehow clear, but it is not clear (at least to me) what a sensible solution to the underlying problem could be.

Actors in Concurrent Code

To my understanding, an actor controls asynchronous acces to a resource (e.g. a variable):

Example:

We could have a method "say" without such a control, together with a function "work" that calls this method:

class Sayer {
    
    // non-exclusive access to a resource:
    func say(_ text: String) {
        print(text)
    }
}

func work(_ id: String) {
    for i in 1...3 {
        sayer.say("\(id) #\(i)")
        sleep(UInt32(Int.random(in: 1...2))) // simulating some real work
    }
}

We could also use an actor, so "say" can only be executed once in a time, and a caller becomes suspended until its call to to "say" actually executes:

actor AsyncSayer {
    
    // exclusive access to a resource needed:
    func say(_ text: String) {
        print(text)
    }
}

func workAsync(_ id: String) async {
    for i in 1...3 {
        await asyncSayer.say("\(id) #\(i)")
        sleep(UInt32(Int.random(in: 1...2))) // simulating some real work
    }
}

We could very well call "workAsync" without actually doing anything in parallel:

@main
struct AsyncTest1 {
    
    static func main() async {

        let workItems = ["A", "B"]
        
        await workItems.forEachAsync { workItem in

            // when we use workAsync instead, we get the following compiler error:
            // "'async' call in a function that does not support concurrency"
            await workAsync(workItem)
        }
    }
}

using

extension Sequence {
    func forEachAsync (
        _ operation: (Element) async throws -> Void
    ) async rethrows {
        for element in self {
            try await operation(element)
        }
    }
}

Of course, in such a context the definition of our actor is not really a sensible thing, since "say" is called only once at a time anyway.

Let's do some work concurrently:

@main
struct AsyncTest2 {
    
    static func main() async {
        
        let workItems = ["A", "B"]
        
        let group = DispatchGroup()
        
        workItems.forEach { workItem in
            group.enter()
            queue.async {
                
                // when we use "workAsync" instead, we get the following compiler error:
                // "'async' call in a function that does not support concurrency"
                work(workItem)
                
                group.leave()
            }
        }

        group.wait()
    }
}

Here, the work is done in a concurrent way, but using the version of "say" which does not (!) control access.

If we would use the version of "say" which does control access, we would have to call "workAsync", but then – as noted in the last code – the compiler says "'async' call in a function that does not support concurrency".

The asnyc/await-"pollution"

I hope that the word "pollution" does not sound too negative, as asnyc/await is a great solution to many problems. What do I mean by "pollution"? Well, when you have code that has to accessed by "await", this code itself can be suspended and has to be marked "async" and called using "await".

OK, this is really seems to be a logical consequence. But compare this to code manipulating state in the Haskell programming language. Haskell "in its pure form" is stateless. Using state anywhere inside a Haskell program that uses state "pollutes" the whole program with this state. But there exists the "State Monad" which can hide state from the rest of a program. I wish there would be such a thing for "async/await" in Swift. ... OK, this quite imprecise, but maybe you see what I mean and someone with a better understanding of these things can say a word about it.

Conclusion

As said at the beginning, some good documentation about the current (or future) state of concurrency in Swift, with working basic examples (!), seems to be missing. Or can anyone point out to a good documentation of this kind (serious question, I would really like to know)? If there is really documentation missing here, maybe some smart people could write one...

At least I would be very grateful if some someone could give me a working example. Thanks in advance!

1 Like

As I side note, I recommend you to use Task.sleep for those things. It works properly with the async system.

In the same way you shouldn't use dispatch for this. A task group it's more akin to what you are trying to do.

Having the entire call chain being async is not a big deal really, but it's true that the pollution exists (what's often referred as the "color" of the function). That said you can stop the pollution at any point by manually wrapping he async call in a task. In the toy example since is a CLI with an async main I don't see the reason to do it tho.

I'm sure there are plenty of resources there but maybe your question is too specific that there is no clear answer online.

Just to point to some well known community blogs:

If you have any concrete question feel free to ask, I'm sure we can find concrete explanations ^^

2 Likes

Thank you for your detailed answer.

I think my question is quite precise: I would like to run something in a command line tool in a concurrent way, and that should able to call use async methods in its inside (see my examples). I would be very thankful for a working example code.

OK. Could this be a solution to the problem explained in the first part? Seems a little bit bit over-constructed for my use case. But good to know. That answers my second part.

Also tried an example with a task group, but it did not help with my main issue. But also good to know, thanks.

Yes, I know those blogs (or most of them) and they are a wonderful resource. But I still think for an important topic like this a more complete documentation with working examples would be great. (Would write one myself, but as you can see I still do not understand everything.)

A second answer to that suggestion: I do not see how this would work, since I then would like to wait for the task to complete, which in turn makes the call async again. See the following code where it could be that nothing "happens" since we are not (!) waiting for the tasks to complete:

@main
struct AsyncTest2 {
    
    static func main() async {
        
        let workItems = ["A", "B"]
        
        let group = DispatchGroup()
        
        workItems.forEach { workItem in
            group.enter()
            queue.async {
                Task {
                    await workAsync(workItem)
                }
                
                group.leave()
            }
        }

        group.wait()
    }
}

Well, my real program is no toy program.

Sorry for not being more concrete but I'm still not really sure what you are trying to accomplish.

But you can already do that. Since main is async your entire program is in an async context you can freely call async functions.

I do think is the solution but probably I'm not understanding the problem.

Yes totally, if you need to wait for async work you need to be in an async context, but again, your main program is already async so you can do that already.


As I said, maybe I'm not understanding the problem but let me try to again anyway ;)

You have an actor that protects access to a resource (the spoke variable in this case)....

actor AsyncSayer {
    var spoke = 0
    func say(_ text: String) {
        spoke += 1
        print("Say: \(text).\t Spoke \(spoke) times.")
    }
}

Then you want to perform some piece of work (workAsync in this case)....

func workAsync(_ id: String, asyncSayer: AsyncSayer) async {
    for i in 1...3 {
        await asyncSayer.say("\(id) #\(i)")
        try? await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC) // simulating some real work
    }
}

Note that I replaced the sleep with a proper Task.sleep.

Then I'm assuming the workItem is the input of your work, and that the array comes from some other place that makes it dynamic...

So since your "input" is variable you need to use a TaskGroup to perform async work concurrently. (if you know you just want to run 2 things concurrently you can use async let instead.)

await withTaskGroup(of: Void.self, body: { group in
  for item in workItems {
    group.addTask {
       await workAsync(item, asyncSayer: actor)
    }
  }
})
The full code
@main
enum App {
    static func main() async throws {
        let workItems = ["A", "B"]
        let actor = AsyncSayer()
        
        await withTaskGroup(of: Void.self, body: { group in
            for item in workItems {
                group.addTask {
                    await workAsync(item, asyncSayer: actor)
                }
            }
        })
    }
}

func workAsync(_ id: String, asyncSayer: AsyncSayer) async {
    for i in 1...3 {
        await asyncSayer.say("\(id) #\(i)")
        try? await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC) // simulating some real work
    }
}

actor AsyncSayer {
    var spoke = 0
    func say(_ text: String) {
        spoke += 1
        print("Say: \(text).\t Spoke \(spoke) times.")
    }
}

Running this prints:

Say: B #1.	 Spoke 1 times.
Say: A #1.	 Spoke 2 times.
Say: B #2.	 Spoke 3 times.
Say: A #2.	 Spoke 4 times.
Say: B #3.	 Spoke 5 times.
Say: A #3.	 Spoke 6 times.

Which shows how each workAsync happens concurrently while keeping the access to the counter protected.

Isn't that what you want? ^^

in a well-architected swift application, async will propogate all the way up its call stack. the main function of @main will almost always be async. async is not throws, which is intended to be sequestered somewhere in the call stack, and you should not design your APIs as if async is just another color of throws.

this is bad practice. this concurrency is unstructured, its body and all captured variables must be @escaping, and the task cannot be cancelled or even awaited unless you are manually storing the Task instance.

  • if the caller does not perform concurrent work, mark it async and let it block without concurrency.

  • if the caller does perform concurrent work, and must synchronize with the callee at the end, use an async let and let it block with concurrency.

  • if the caller does perform concurrent work, but does not have to synchronize with the callee, factor the callee vertically out of the caller, and run it with an async let in the grand-caller. the caller is now a normal synchronous function.

await should be common but not very common in your code. (probably, around the same frequency as var or fileprivate.) beginners tend to smear await everywhere because the compiler suggests it as a cure for async. usually, people do this because they are trying to communicate something to an actor, so they await on the actor lock and then they wrap it in a free-floating Task so the whole caller doesn’t have to be marked async. what you actually want in these scenarios is an AsyncStream<T>, which you can yield(_:) to or finish without having to block against the stream’s consumer with the same priority.

in addition to the general AsyncStream<T>, familiarize yourself with the following patterns:

  • AsyncStream<Void>
  • AsyncStream<Never>
  • AsyncThrowingStream<Never, Failure>
1 Like

Ah, OK, this should to be what I want – but one question: I have up to 40000 work items, I suppose only a sensible number of tasks (i.e. threads) are created? Or do I have to work around it somehow, to make sure only a certain number off tasks are running in parallel? Can I somehow make explicit how many threads should be used?

(I have now seen that withTaskGroup is indeed mentioned in the Swift documentation, but without answering my last questions – there is a link to the TaskGroup documentation, but even there it is not explained. Before, I did not found anything about withTaskGroup because I actually searched for blog posts, and found nothing. So it is really not so easy. So thank you very much!)

The concurrency runtime handles the execution width for you automatically. It's usually the number of cores your system has, but no particular value is guaranteed, and they system doesn't yet allow customization of the underlying executor. Only testing can tell you if 40,000 items can execute efficiently. It should.

This simple executable takes about 8.6s (in debug and release mode) to create and execute 50,000 tasks in a group. Little actual CPU usage during that time, so it seems just enqueuing the work takes a while.

import CoreFoundation

@main
struct ConcurrencyTest {
    static func main() async {
        let start = CFAbsoluteTimeGetCurrent()
        await withTaskGroup(of: Void.self) { group in
            @Sendable
            func doNothing() {}
            
            for _ in 0..<50_000 {
                group.addTask {
                    doNothing()
                }
            }
            
            await group.waitForAll()
        }
        print(CFAbsoluteTimeGetCurrent() - start)
    }
}

While running it only used five threads, so there must be other considerations for execution width than just number of cores (this is an i9 iMac with 10/20 cores).

If I understand you correctly, then the solution would be not to block the resource, but to "inform" the resource (i.e. the interface to the resource) that something should be done (via a stream interface)? This is a solution I had thought about, but then I fear that too much resource work is accumulating and the change to the resource does not execute in a timely manner. In my use case (where the resource will be sparingly used in the common case) I would think that a blocking would be better (but I am not sure about that). But yes, all those "await" are then a downside. So I will look up your suggestions, thank you very much.

That's what I thought. It is not documented on Concurrency — The Swift Programming Language (Swift 5.5) as far as I can see, but I hope it is specified somewhere.

In my own testing (with real, although lightweight work done, including "await" calls for some resources, for 40184 work items), according to the log files 69 tasks were started before one task had finished, in the activation monitor I saw up to 31 threads used, that was on a M1 machine which has 4 + 4 cores.

In my own testing with the "lightweight" work mentioned, my program now needed 135 seconds instead of 270 seconds before. So the execution time was the half of the sequential version (but using approximately 5 times more memory). So the win for the execution time is less than I hoped for and I have the suspicion that too many threads are used. Maybe the suspending of threads (the "await" for the resource was the same as before) triggers the creation of new ones. So maybe it would be better if the program could have a little more control over those things.

You can kind of control the width of the TaskGroup by only enqueuing a certain number of tasks up front and then enqueuing more as some of that group completes.

import CoreFoundation

@main
struct ConcurrencyTest {
    static func main() async {
        let start = CFAbsoluteTimeGetCurrent()
        await withTaskGroup(of: Void.self) { group in
            @Sendable
            func doNothing() {}
            
            var tasksToPerform = 49_990
            for _ in 0..<10 {
                group.addTask {
                    doNothing()
                }
            }
            
            for await _ in group {
                if tasksToPerform > 0 {
                    group.addTask { doNothing() }
                    tasksToPerform -= 1
                }
            }
        }
        print(CFAbsoluteTimeGetCurrent() - start)
    }
}

Hilariously, in this case it reduced my overall runtime from ~8.6s to 0.059s.

5 Likes

Totally off topic but while scripting may not be the priority, the async test code can't be casually run with a script or Playgrounds (due to [SR-12683] `@main` is no longer usable due to misdetection of top level code - Swift) is extremely unfortunate.

1 Like

I learned long ago not to trust playgrounds or scripts due to how they differ from typical Swift execution. I just have a macOS command line tool project sitting around for these experiments.

2 Likes

In my case now around 80 seconds (with 4 to 10 active tasks). Not many tasks started before the first one ends. And the memory consumption stays low. Total concurrent threads used by the program seem to be around 15. Great!

Question: Does this always catches all (!) tasks? I.e. if one task gets finished before this statement, does still go into the for-loop?

It should, otherwise it would be pretty broken.

1 Like

I'm glad it helped ^^

That a question I'm still not sure if we have a good theoretical answer for. You can check out Swift concurrency: Behind the scenes. Of course trying it in practice is always the best solution, which I see this thread have been doing already ^^

Yes I totally agree. I'm well aware I was just trying to give options because I didn't fully understand what was the original question ^^

But I don't understand some of your points:

  1. How do these two things play together, they seem a bit contradictory :thinking:

I'm curious about it because I would love to know if I'm overusing await ^^

Can you elaborate on these points? Maybe point to some examples? I'm not sure I follow it.

And here you lost me completely :joy: It feels like actors are being dismissed but that can't be true no? Or are you just talking about scenarios where you want to communicate stuff to the actor without wanting a response? I would love to know more about these patterns you mention that use AsyncStreams. Is maybe this Communicating between two concurrent tasks an example of what you are referring to?

Thanks for the comments, I'm curious to learn more ^^

I'm not sure what @taylorswift is referring to, but I've found a few basic ways of structuring concurrent code so far:

  1. Loop and await:
for item in workItems {
  await doSomething(using: item)
}

Advantages: simple, minimal overhead.
Disadvantages: no concurrent execution, so no faster than simply calling a synchronous version of doSomething.

  1. Create Tasks and await:
let tasks = workItems.map { Task { await doSomething(using: $0) } }

for task in tasks {
  await task.value
}

Advantages: fully concurrent execution, preserves original ordering.
Disadvantages: must wait for all items to finish before you have output, creating all tasks up front can be slow

  1. Stream Tasks:
AsyncStream { continuation in
  Task {
    let tasks = self.map { item in
      Task { await doSomething(using: item) }
    }

    for task in tasks {
      continuation.yield(await task.value)
    }

    continuation.finish()
  }
}

Advantages: concurrent execution, results start as soon as one's available, order preserved
Disadvantages: creating tasks up front can be slow

There are various other ways to assemble the concurrency primitives to get the behavior you want. I also looked at using TaskGroup, and limiting its width, in another thread, so all of these strategies could be combined.

6 Likes

Are you sure about that? The documentation says:

To create an unstructured task that runs on the current actor, call the Task.init(priority:operation:) initializer.

This would mean that all your tasks run on the same actor.

Terms of Service

Privacy Policy

Cookie Policy