Trying to implement basic concurrent code with actors

Thank you for your detailed answer.

I think my question is quite precise: I would like to run something in a command line tool in a concurrent way, and that should able to call use async methods in its inside (see my examples). I would be very thankful for a working example code.

OK. Could this be a solution to the problem explained in the first part? Seems a little bit bit over-constructed for my use case. But good to know. That answers my second part.

Also tried an example with a task group, but it did not help with my main issue. But also good to know, thanks.

Yes, I know those blogs (or most of them) and they are a wonderful resource. But I still think for an important topic like this a more complete documentation with working examples would be great. (Would write one myself, but as you can see I still do not understand everything.)

A second answer to that suggestion: I do not see how this would work, since I then would like to wait for the task to complete, which in turn makes the call async again. See the following code where it could be that nothing "happens" since we are not (!) waiting for the tasks to complete:

@main
struct AsyncTest2 {
    
    static func main() async {
        
        let workItems = ["A", "B"]
        
        let group = DispatchGroup()
        
        workItems.forEach { workItem in
            group.enter()
            queue.async {
                Task {
                    await workAsync(workItem)
                }
                
                group.leave()
            }
        }

        group.wait()
    }
}

Well, my real program is no toy program.

Sorry for not being more concrete but I'm still not really sure what you are trying to accomplish.

But you can already do that. Since main is async your entire program is in an async context you can freely call async functions.

I do think is the solution but probably I'm not understanding the problem.

Yes totally, if you need to wait for async work you need to be in an async context, but again, your main program is already async so you can do that already.


As I said, maybe I'm not understanding the problem but let me try to again anyway ;)

You have an actor that protects access to a resource (the spoke variable in this case)....

actor AsyncSayer {
    var spoke = 0
    func say(_ text: String) {
        spoke += 1
        print("Say: \(text).\t Spoke \(spoke) times.")
    }
}

Then you want to perform some piece of work (workAsync in this case)....

func workAsync(_ id: String, asyncSayer: AsyncSayer) async {
    for i in 1...3 {
        await asyncSayer.say("\(id) #\(i)")
        try? await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC) // simulating some real work
    }
}

Note that I replaced the sleep with a proper Task.sleep.

Then I'm assuming the workItem is the input of your work, and that the array comes from some other place that makes it dynamic...

So since your "input" is variable you need to use a TaskGroup to perform async work concurrently. (if you know you just want to run 2 things concurrently you can use async let instead.)

await withTaskGroup(of: Void.self, body: { group in
  for item in workItems {
    group.addTask {
       await workAsync(item, asyncSayer: actor)
    }
  }
})
The full code
@main
enum App {
    static func main() async throws {
        let workItems = ["A", "B"]
        let actor = AsyncSayer()
        
        await withTaskGroup(of: Void.self, body: { group in
            for item in workItems {
                group.addTask {
                    await workAsync(item, asyncSayer: actor)
                }
            }
        })
    }
}

func workAsync(_ id: String, asyncSayer: AsyncSayer) async {
    for i in 1...3 {
        await asyncSayer.say("\(id) #\(i)")
        try? await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC) // simulating some real work
    }
}

actor AsyncSayer {
    var spoke = 0
    func say(_ text: String) {
        spoke += 1
        print("Say: \(text).\t Spoke \(spoke) times.")
    }
}

Running this prints:

Say: B #1.	 Spoke 1 times.
Say: A #1.	 Spoke 2 times.
Say: B #2.	 Spoke 3 times.
Say: A #2.	 Spoke 4 times.
Say: B #3.	 Spoke 5 times.
Say: A #3.	 Spoke 6 times.

Which shows how each workAsync happens concurrently while keeping the access to the counter protected.

Isn't that what you want? ^^

in a well-architected swift application, async will propogate all the way up its call stack. the main function of @main will almost always be async. async is not throws, which is intended to be sequestered somewhere in the call stack, and you should not design your APIs as if async is just another color of throws.

this is bad practice. this concurrency is unstructured, its body and all captured variables must be @escaping, and the task cannot be cancelled or even awaited unless you are manually storing the Task instance.

  • if the caller does not perform concurrent work, mark it async and let it block without concurrency.

  • if the caller does perform concurrent work, and must synchronize with the callee at the end, use an async let and let it block with concurrency.

  • if the caller does perform concurrent work, but does not have to synchronize with the callee, factor the callee vertically out of the caller, and run it with an async let in the grand-caller. the caller is now a normal synchronous function.

await should be common but not very common in your code. (probably, around the same frequency as var or fileprivate.) beginners tend to smear await everywhere because the compiler suggests it as a cure for async. usually, people do this because they are trying to communicate something to an actor, so they await on the actor lock and then they wrap it in a free-floating Task so the whole caller doesn’t have to be marked async. what you actually want in these scenarios is an AsyncStream<T>, which you can yield(_:) to or finish without having to block against the stream’s consumer with the same priority.

in addition to the general AsyncStream<T>, familiarize yourself with the following patterns:

  • AsyncStream<Void>
  • AsyncStream<Never>
  • AsyncThrowingStream<Never, Failure>
1 Like

Ah, OK, this should to be what I want – but one question: I have up to 40000 work items, I suppose only a sensible number of tasks (i.e. threads) are created? Or do I have to work around it somehow, to make sure only a certain number off tasks are running in parallel? Can I somehow make explicit how many threads should be used?

(I have now seen that withTaskGroup is indeed mentioned in the Swift documentation, but without answering my last questions – there is a link to the TaskGroup documentation, but even there it is not explained. Before, I did not found anything about withTaskGroup because I actually searched for blog posts, and found nothing. So it is really not so easy. So thank you very much!)

The concurrency runtime handles the execution width for you automatically. It's usually the number of cores your system has, but no particular value is guaranteed, and they system doesn't yet allow customization of the underlying executor. Only testing can tell you if 40,000 items can execute efficiently. It should.

This simple executable takes about 8.6s (in debug and release mode) to create and execute 50,000 tasks in a group. Little actual CPU usage during that time, so it seems just enqueuing the work takes a while.

import CoreFoundation

@main
struct ConcurrencyTest {
    static func main() async {
        let start = CFAbsoluteTimeGetCurrent()
        await withTaskGroup(of: Void.self) { group in
            @Sendable
            func doNothing() {}
            
            for _ in 0..<50_000 {
                group.addTask {
                    doNothing()
                }
            }
            
            await group.waitForAll()
        }
        print(CFAbsoluteTimeGetCurrent() - start)
    }
}

While running it only used five threads, so there must be other considerations for execution width than just number of cores (this is an i9 iMac with 10/20 cores).

If I understand you correctly, then the solution would be not to block the resource, but to "inform" the resource (i.e. the interface to the resource) that something should be done (via a stream interface)? This is a solution I had thought about, but then I fear that too much resource work is accumulating and the change to the resource does not execute in a timely manner. In my use case (where the resource will be sparingly used in the common case) I would think that a blocking would be better (but I am not sure about that). But yes, all those "await" are then a downside. So I will look up your suggestions, thank you very much.

That's what I thought. It is not documented on The Swift Programming Language: Redirect as far as I can see, but I hope it is specified somewhere.

In my own testing (with real, although lightweight work done, including "await" calls for some resources, for 40184 work items), according to the log files 69 tasks were started before one task had finished, in the activation monitor I saw up to 31 threads used, that was on a M1 machine which has 4 + 4 cores.

In my own testing with the "lightweight" work mentioned, my program now needed 135 seconds instead of 270 seconds before. So the execution time was the half of the sequential version (but using approximately 5 times more memory). So the win for the execution time is less than I hoped for and I have the suspicion that too many threads are used. Maybe the suspending of threads (the "await" for the resource was the same as before) triggers the creation of new ones. So maybe it would be better if the program could have a little more control over those things.

You can kind of control the width of the TaskGroup by only enqueuing a certain number of tasks up front and then enqueuing more as some of that group completes.

import CoreFoundation

@main
struct ConcurrencyTest {
    static func main() async {
        let start = CFAbsoluteTimeGetCurrent()
        await withTaskGroup(of: Void.self) { group in
            @Sendable
            func doNothing() {}
            
            var tasksToPerform = 49_990
            for _ in 0..<10 {
                group.addTask {
                    doNothing()
                }
            }
            
            for await _ in group {
                if tasksToPerform > 0 {
                    group.addTask { doNothing() }
                    tasksToPerform -= 1
                }
            }
        }
        print(CFAbsoluteTimeGetCurrent() - start)
    }
}

Hilariously, in this case it reduced my overall runtime from ~8.6s to 0.059s.

5 Likes

Totally off topic but while scripting may not be the priority, the async test code can't be casually run with a script or Playgrounds (due to [SR-12683] `@main` is no longer usable due to misdetection of top level code · Issue #55127 · apple/swift · GitHub) is extremely unfortunate.

1 Like

I learned long ago not to trust playgrounds or scripts due to how they differ from typical Swift execution. I just have a macOS command line tool project sitting around for these experiments.

2 Likes

In my case now around 80 seconds (with 4 to 10 active tasks). Not many tasks started before the first one ends. And the memory consumption stays low. Total concurrent threads used by the program seem to be around 15. Great!

Question: Does this always catches all (!) tasks? I.e. if one task gets finished before this statement, does still go into the for-loop?

It should, otherwise it would be pretty broken.

1 Like

I'm glad it helped ^^

That a question I'm still not sure if we have a good theoretical answer for. You can check out Swift concurrency: Behind the scenes. Of course trying it in practice is always the best solution, which I see this thread have been doing already ^^

Yes I totally agree. I'm well aware I was just trying to give options because I didn't fully understand what was the original question ^^

But I don't understand some of your points:

  1. How do these two things play together, they seem a bit contradictory :thinking:

I'm curious about it because I would love to know if I'm overusing await ^^

Can you elaborate on these points? Maybe point to some examples? I'm not sure I follow it.

And here you lost me completely :joy: It feels like actors are being dismissed but that can't be true no? Or are you just talking about scenarios where you want to communicate stuff to the actor without wanting a response? I would love to know more about these patterns you mention that use AsyncStreams. Is maybe this Communicating between two concurrent tasks an example of what you are referring to?

Thanks for the comments, I'm curious to learn more ^^

I'm not sure what @taylorswift is referring to, but I've found a few basic ways of structuring concurrent code so far:

  1. Loop and await:
for item in workItems {
  await doSomething(using: item)
}

Advantages: simple, minimal overhead.
Disadvantages: no concurrent execution, so no faster than simply calling a synchronous version of doSomething.

  1. Create Tasks and await:
let tasks = workItems.map { Task { await doSomething(using: $0) } }

for task in tasks {
  await task.value
}

Advantages: fully concurrent execution, preserves original ordering.
Disadvantages: must wait for all items to finish before you have output, creating all tasks up front can be slow

  1. Stream Tasks:
AsyncStream { continuation in
  Task {
    let tasks = self.map { item in
      Task { await doSomething(using: item) }
    }

    for task in tasks {
      continuation.yield(await task.value)
    }

    continuation.finish()
  }
}

Advantages: concurrent execution, results start as soon as one's available, order preserved
Disadvantages: creating tasks up front can be slow

There are various other ways to assemble the concurrency primitives to get the behavior you want. I also looked at using TaskGroup, and limiting its width, in another thread, so all of these strategies could be combined.

6 Likes

Are you sure about that? The documentation says:

To create an unstructured task that runs on the current actor, call the Task.init(priority:operation:) initializer.

This would mean that all your tasks run on the same actor.

These examples aren't in an actor context. Even if they were that doesn't necessarily mean they don't run concurrently, or even in parallel. If you're not touching actor state it seems to execute just like the non-actor case. If you are, you pay the synchronization overhead, but the rest of the execution can happen concurrently.

Overall, I've found that the best way to find an execution strategy for your work is to test various implementations until you find the fastest one, including simply throwing synchronous work onto a single Task.

2 Likes

This method works fine as long as the processed data does not get too large and/or the call chain that follows too complicated, in which case I get a nasty EXC_BAD_ACCESS error, see the code below: just wrapping the call into group.addTask produces the error. Any guess? (Thanks.) (In the full code I then do the for await _ in group, same problem then.)

        print("------- 1: OK")
        
        let run1 = Items(files)

        if #available(macOS 10.15, *) {
            await withTaskGroup(of: Void.self) { group in
                if let file = run1.next {
                    print("processing \(file.path)...")
                    do {
                        try await processFile(file: file)
                    }
                    catch {
                        print("ERROR: \(error.localizedDescription)")
                    }
                }
            }
        } else {
            print("wrong OS version")
        }
        
        print("press return...")
        _ = readLine()
        print("continuing...")
        
        print("------- 2: Thread 2: EXC_BAD_ACCESS (code=2, address=...)")
        
        let run2 = Items(files)

        if #available(macOS 10.15, *) {
            await withTaskGroup(of: Void.self) { group in
                if let file = run2.next {
                    print("processing \(file.path)...")
                    group.addTask {
                        do {
                            try await processFile(file: file)
                        }
                        catch {
                            print("ERROR: \(error.localizedDescription)")
                        }
                    }
                }
            }
        } else {
            print("wrong OS version")
        }

UPDATE: I added a demo project to show this error as GitHub - stefanspringer1/BadAccessDemo. Maybe someone has an idea... Thanks.

UPDATE 2: In my "real" project I managed to avoid the EXC_BAD_ACCESS error by keeping track of the data outside of the await withTaskGroup. Else, the data I am working on disappears and I get the error. But 1) I then do not know where I should remove the data from the tracking again (got back the EXC_BAD_ACCESS error each time I tried), and 2) in my BadAccessDemo mentioned, this trick somehow did not work. Could this be an error in the Swift runtime?