Swift Concurrency to run continuous work?

Hi!

I am struggling to get my head around some of a concurrency aspects. And probably even getting some serious performance issues as well using it.

So the simplified version of what I am working right now is next: there are 4-step processing involving streaming, computations, file system write/read and upload to a storage. All these steps are sequential, yet could be run independently. In code it looks something like this:

actor Service {
    private let process: ImageProcessingUnit
    private let adapt: ResultAdapter
    private let upload: UploadQueue

    func receive(newBuffer: ImageBuffer) async throws {
        let processedResult = try await process.execute(for: newBuffer)
        let outputData = try await adapt.execute(for: processedResult)
        try await upload.execute(for: outputData)
    }
}

final class ImageProcessingUnit { ... }
final class ResultAdapter { ... }
final class UploadQueue { ... }

// somewhere else in some top-level code
func listen(...) {
    Task {
        service.receive(newBuffer: buffer)
    }
}

Each step does extensive work on CPU and GPU, constantly consuming resources. Since all of that happens on mobile device, it is quite critical to use them wisely :slight_smile:
It also ensures that processing and uploading are being executed only on at the time, but omitted here. However, I am running in a high usage of resources that eventually leads in throttling, slow down and many other issues. So my current guess is that despite adding internal queueing logic, it still creates too many tasks to just wait, at some point leading to issues.

What am I trying to achieve?

In the pre-concurrency world I would create 3-4 serial DispatchQueues to execute the work and that is probably all what I would need to do. While in Swift Concurrency I cannot understand how I can declare that here is a several separated tasks that are running on their own queues? Is there analogy? Am I doing it wrong currently? Or maybe even I shouldn't use concurrency at all here?

From what I understood—will creating multiple actors help here?
You can try Worker Pool for that:

protocol Worker: Actor {
  associatedtype WorkItem: Sendable
  associatedtype WorkResult: Sendable
  
  func submit(work: WorkItem) async throws -> WorkResult
}

struct WorkerPoolError: Error {}

actor WorkerPool<W> where W: Worker {
  
  typealias WorkItem = W.WorkItem
  typealias WorkResult = W.WorkResult
  
  private var workers: [W] = []
  private var currentIndex: Int = 0

  init(workers: [W]) {
    self.workers = workers
  }
 
  func submit(work item: WorkItem) async throws -> WorkResult {
    try await self.getNextWorker()
      .submit(work: item)
  }

  private func getNextWorker() throws -> W {
    guard !self.workers.isEmpty else {
      throw WorkerPoolError()
    }
    let nextWorker = self.workers[currentIndex]
    self.currentIndex = (self.currentIndex + 1) % self.workers.count
    return nextWorker
  }
}

and then just wrap a work:

actor Service {
  
  private let pool: WorkerPool<ServiceWorker> // = .init(workers: .init(repeating: ServiceWorker(), count: 4))
  
  func receive(newBuffer: ImageBuffer) async throws {
    try await self.pool.submit(work: newBuffer)
  }
}

actor ServiceWorker: Worker {
  
  private let process: ImageProcessingUnit
  private let adapt: ResultAdapter
  private let upload: UploadQueue
  
  func submit(work: ImageBuffer) async throws {
    let processedResult = try await process.execute(for: newBuffer)
    let outputData = try await adapt.execute(for: processedResult)
    try await upload.execute(for: outputData)
  }
}
1 Like

That's probably the right conclusion (assuming you mean "Swift Concurrency i.e. Task etc" and not literally concurrency).

Swift Concurrency isn't designed for long-running tasks. It is possible to use it for them, as long as you can guarantee forward progress and latency isn't a concern, but it's easier to just use GCD.

1 Like

So I generally wanted to keep actors here at minimum. From my perspective and understanding of them is that I only need Service to be an actor, because it has some shared state in it.

Yet still with each step inside worker it would leave actor for execution some work anyway?

Well, I do not mean by Swift Concurrency just Tasks, but from how it comes out in implementation currently is really narrows down to a Task and an actor, and I feel there is something wrong. I would like to utilise all capabilities, but not sure how to do it right.

To reiterate I want to get a behaviour similar/equivalent to running each step inside receive method on a separate queue. Or just anything more deterministic. Because how I currently see that system (and workers approach as well) is that due to actor reentrancy and Task creation this becomes a mess at runtime.

As for second paragraph, I am not concerned about latency here, and maybe even prefer it here to reduce load. So am I correct that in general new concurrency in Swift isn’t going to replace GCD completely? Like for some tasks it is still more suitable to use another approach? Because I thought it was designed as replacement, and actually has too few experience with async/await approach at large scale and fully dived into only with Swift.

Here why I haven't responded to whole message as it's not quite obvious what are those classes inside actors, cause at first glance there is no shared state, actually.
Anyway actors are also about communication and processing logic inside.

But you have 4 of them :upside_down_face: Or whatever you define.

Personally would go into playing with more actors and custom executors, but to be honest for the latter haven't touched it myself yet, also need to figure out.

1 Like

My perspective could be wrong here, I mean - I’ve read about actors and made my mind that I need them really rarely when there is shared state involved. Yet you are right I have completely missed a point of Service being an actor.

So classes inside is just commands. Complex, but stateless commands. Service is the only stateful party here. How I understood your suggestion to approach that is that I can treat actors as separate queues, which seemed to me wrong before. I thought about actors as just entities that guard shared state in some way and that is all. If I can treat actors as queues roughly then I suppose it will make sense to define each step as an actor. However, due to reentrancy they won’t behave as serial queues and diving into custom executors seems wrong to me here since I’m not sure if I understand the whole concurrency concept correctly.

1 Like

For me sometimes thinking about actors as Erlangs/Elixir processes helps, as a primitive for isolated computation. It’s just naming, but still. :slightly_smiling_face:

I’m now curious to play with actors + custom executors myself for long running heavy jobs :thinking: as have an idea where I can apply that.

1 Like

That's not really right. There's nothing "not designed for long running tasks" about Swift's tasks at all, the underlying runtime is all the same.

All the recent work with custom actor executors, task executors especially are aimed to allowing whatever workloads including "bad blocking code" on swift concurrency tasks.

One could argue task executors are specifically going to solve this specific kind of issues and were not available yet in a stable release. But still, it's not that Swift concurrency isn't designed for some kind of work -- it's about customizing where what workload runs.


I really like your worker pool example @jaleel, makes my Akka-past heart beat with happiness :laughing: That's the usual "pattern" :slight_smile:

Yes a pool like that is something you could do in a pure-actors world -- and put them on custom executors if you needed them to do blocking work etc.

3 Likes

I wish there would be some nice “standard” libraries for common use cases.

It’s always helpful to try to formulate what those are — are you looking for something like the above pool but on a different executor?

Are we talking about Apple platforms or cross platform or specifically „not dispatch”?

So my initial thoughts seems to be correct in the light of that — currently there is no way to explicitly define where tasks is running and custom executors are the solution to that.

Still actors could be used as mechanisms to ensure that in a (limited?) way? For example, I don’t need a pool of workers, since several workers would be too much for me here, but if I need to execute some continuous work in the background — putting that work on an actor is a good approach?

Yeah, without task executors the way to control the “where to execute” of tasks is by using an actor with some specific executor.

The Service actor will make sure that code only runs on a single thread at a time, thus avoiding traditional multithread race conditions, but what it won't do is ensure that functions run atomically. It is not a queue, and is subject to reentrance, as you point out yourself.

The problem is that your listen function is not queuing anything, so you will have many calls to receive in-flight at the same time. Eg. one call might have just finished process.execute, while another one is currently executing adapt.execute. You get interlacing, and potentially an explosion of tasks.

If I were you, I would introduce queuing in the top level listen function. Instead of just starting Tasks as they come in, add them to a AsyncStream, which is the Swift Concurrency equivalent of a queue (AsyncStream | Apple Developer Documentation).

Off the top of my head (totally untested), it would look a bit like this...

var streamContinuation: AsyncStream<ImageBuffer>.Continuation!
var stream: AsyncStream<ImageBuffer> = AsyncStream { continuation in
    streamContinuation = continuation
}

func listen(...) {
    streamContinuation.yield(buffer)
} 

Task {
    for await imageBuffer in stream {
         await service.receive(newBuffer: imageBuffer)
    }
}

3 Likes

There are queues inside two steps — so processing and uploading are executed one by one. But approach with async stream to achieve this looks interesting, never thought of that (well, haven’t used streams yet), need to try that, thanks!

So, I've played a bit with different approaches yesterday and so far here my results:

  1. On one device there is not much difference when running multiple tasks vs. running one actor with reentrancy vs. multiple worker actors. Especially if there are lot's of simultaneous jobs—it will start to use all the cores at one point. This I guess is fine and how it works by design in Swift concurrency.
    :top: So it's all about encapsulation in the end and where you keep your code. If there are multiple heavy executions—one must think about better solution.

  2. I've managed to write some simple custom thread, and using custom executors run actor jobs on this specific threads. This gives ability to control and isolate some heavy execution, so that other cores/threads don't starve.
    :top: With this approach though it's important to understand that all services like ImageProcessingUnit, ResultAdapter and etc. should be either none-async or exact actors with controlled execution. You can't create a generic solution for now, as as soon as you hit await in the actor—it will run on separate thread. (Could be inheritance isolation will fix that @ktoso? Need to read proposal again...).

Here is a screenshot and url to play of simple app running big loop as heave CPU job.

Would be nice also to play with AsyncStreams :top: but for that want a bit more interesting example than just running a big loop.

Guess no surprise, as was reading too much of your code last year :melting_face::upside_down_face:

3 Likes

A bit more explanation on why on CPU usage graphs:

On left it's regular async/actor stuff. It's actually quite heavy and even makes M1 life a bit hard. You can notice it when running an app on macOS and sending app to background and back again.
On the right it's 2 workers with custom threads, it takes longer to run execution, but everything is completely smooth.

2 Likes

Thanks for example, I was going to try custom executors over weekends — suppose it will be the starting point to see how it works in practice and try streams too.

This part over CPU usage is what I was afraid of initially, and seems like that was reasonable. Custom executors is giving exactly that level of control seemed necessary in this situation.

1 Like

Note though to achieve that you need multiple heavy jobs run simultaneously. Would say in real world just regular async/await will work for 99%.

Also with custom executors think you'll need to make ImageProcessingUnit, ResultAdapter, UploadQueue actors that implement custom execution. :thinking: Service then just runs them.

Totally agree that in most cases regulars work perfectly. For over a year that I've been using new concurrency features of language this is the first time I had assumed that this might be a problem.

Similar thoughts here :slight_smile:

1 Like