Serial Execution in Swift Concurrency

Hello guys!

I have an issue to understand what could be the best approach to choose when I want to make sure all my submitted tasks will be executed FIFO, but I would like to avoid using GCD.

Is there any solidified approach of setting up this with Swift Concurrency?

This is my current approach:


actor SerialQueueActor {

    private var previousTask: Task<Void, Error>?

    func addTask(_ task: @escaping () async -> Void) {
        previousTask = Task { [previousTask] in
            try? await previousTask?.value
            await task()
        }
    }
    
    func wait() async {
        try? await previousTask?.value
    }
}


final class TaskExecutor: TaskExecutable {
    
    let eventQueue: SerialQueueActor
    let crashQueue: SerialQueueActor
    let emitterQueue: SerialQueueActor
    
    init() {
        eventQueue = SerialQueueActor()
        crashQueue = SerialQueueActor()
        emitterQueue = SerialQueueActor()
    }
    
    func dispatch(type: TaskQueue, _ task: @escaping () async -> Void) {
        Task(priority: .high) {
            switch type {
            case .event:
                await eventQueue.addTask(task)
            case .crash:
                await crashQueue.addTask(task)
            case .emitter:
                await emitterQueue.addTask(task)
            }
        }
    }
    
    func dispatch(type: TaskQueue, _ task: @escaping () async -> Void) async {
        switch type {
        case .event:
            await eventQueue.addTask(task)
        case .crash:
            await crashQueue.addTask(task)
        case .emitter:
            await emitterQueue.addTask(task)
        }
    }
    
}

I feel like chaining tasks to make sure they are executed serially here is kinda a workaround. Maybe there are some better ways. Any help would be highly appreciated.

I would be ok to mix GCD with swift concurrency but I guess it's not a recommended option.
Also I should maintain iOS 13 important to keep in mind.

I might suggest AsyncChannel. In your example, it might be a channel of closures. E.g., perhaps:

import AsyncAlgorithms

enum TaskQueue {
    case event
    case crash
    case emitter
}

actor TaskExecutor {
    typealias Work = @Sendable () async -> Void

    let eventQueue = AsyncChannel<Work>()
    let crashQueue = AsyncChannel<Work>()
    let emitterQueue = AsyncChannel<Work>()

    func dispatch(type: TaskQueue, _ task: @escaping Work) async {
        switch type {
        case .event:   await eventQueue.send(task)
        case .crash:   await crashQueue.send(task)
        case .emitter: await emitterQueue.send(task)
        }
    }

    func start() async {
        await withTaskGroup { group in
            group.addTask {
                for await work in self.eventQueue {
                    await work()
                }
            }
            group.addTask {
                for await work in self.crashQueue {
                    await work()
                }
            }
            group.addTask {
                for await work in self.emitterQueue {
                    await work()
                }
            }
        }
    }
}

I must confess that I would generally look for channels where I'm sending some model object, but, if you must, you can use closures, too.

Anyway, the virtue of this approach is that it can keep you within the realm of structured concurrency, which case, should you even need it, you can gracefully support cancellation. You generally should be a little wary of creating a lot of unstructured concurrency.

Anyway, that will process them in order:

(The ⓢ signposts indicate when the event was sent, and the interval (of the same color) represents when it ran.)

2 Likes

Thank you for your suggestion.

Does it mean I will have to call function start() everytime I want to process all submitted tasks from the channel?

What If I need to process tasks whenever there is something submitted into the channel? Do I need to setup some sort of timer?

Or just call start() in the end of every dispatch call? I guess that would be not efficient enough.

You just call start once, to start monitoring/processing the channels.

Thank you, @robert.ryan.

Good example.

I couldn't resist trying it. :slight_smile:

// [https://forums.swift.org/t/serial-execution-in-swift-concurrency/79003/2]

@main
enum Driver {
    static func main () async throws {
        let u = TaskExecutor ()
       
        print ("start...")
        // Start it in a task to avoid getting stuck
        Task {
            await u.start()
        }

        print ("dispatch...")
        await u.dispatch (type: .event) {
            print ("1 event work finished")
        }
        
        await u.dispatch (type: .emitter) {
            print ("2 emitter work finished")
        }
        
        await u.dispatch (type: .crash) {
            print ("3 crash work finished")
        }
        
        print ("sleep for 7 seconds...")
        try await Task.sleep (until: .now + .seconds (7))
        print ("awake")
    }
}

Outputs

start...
dispatch...
1 event work finished
3 crash work finished
sleep for 7 seconds...
2 emitter work finished
awake

I have another class from where I call start method but all the work items submitted after the call never get processed

One would call start once per instance of this TaskExecutor. You cannot have multiple for-await loops iterating through the same AsyncChannel.

As an aside, I might advise against the “executor” terminology, as that already has a meaning with Swift concurrency (e.g., SE-0392). The use of the term in this context might only serve to confuse.

I guess that's what I already do, I call start when my main class is initialised.

final class Tracker {
    
    private let executor: TaskExecutor
    
    init(executor: TaskExecutor) {
        self.executor = executor
        
        Task {
            await sendSystemEvent()
            await executor.start() // Only events submitted before start will be processed
        }
    }
    
    private func sendSystemEvent() async {
        await executor.dispatch(.event) {
            // some async work
        }
    }
    
    public func trackEvent(_ name: String) async {
        await executor.dispatch(.event) {
            // some async work
        }
    }
}

But if trackEvent called after start() no event will be processed.

UPDATE: I've tried to implement this in the empty app and it actually worked. Thank you. I will try to figure out what could be wrong with my setup.

More information about your app would be helpful.

For example, one could easily run into problems with a command line app: The app could easily terminate before the unstructured concurrency that launched start finished.

This is why we avoid unstructured concurrency wherever possible, and where we must use it, ensure (a) we properly handle cancellation; and/or (b) await the results of that Task before the app terminates. Ideally, we stay within structured concurrency.

But keep us posted and good luck.

1 Like

I am playing around with the Swift Concurrency via Instruments and I noticed there are always 2 alive tasks. Does it happen because waitgroup after calling start() keep them always alive waiting for next work items to process? I guess that's what I need but Is it ok that they are permanently suspended unless canceled?

struct ContentView: View {
    
    @ObservedObject var viewModel = ContentViewModel()

    var body: some View {
        Button {
            Task {
                await self.viewModel.trackEvent()
            }
            Task {
                await self.viewModel.trackCrash()
            }
        } label: {
            Label("Add Item", systemImage: "plus")
        }.onAppear {
            Task {
                await self.viewModel.start()
            }
        }
    }
}

final class ContentViewModel: ObservableObject {
    
    let executor = TaskExecutor()
    
    func start() async {
        await executor.start()
    }
    
    func trackEvent() async {
        await executor.dispatch(type: .event) { [weak self] in
            self?.calculateSomethingIntensive()
            print("event is done")
        }
    }
    
    func trackCrash() async {
        await executor.dispatch(type: .crash) { [weak self] in
            self?.calculateSomethingIntensive()
            print("crash  is done")
        }
    }
    
    
    func calculateSomethingIntensive() {
        // Simulate heavy calculation
        var result = 0
        for i in 0..<1_000 {
            result += i % 100
        }
    }
    
}

Yes, this is OK, as long as the number of tasks is reasonably limited. You generally want to avoid having thousands or millions of tasks (largely for memory reasons), but having a few isolated tasks associated with real asynchronous work is not uncommon.

E.g., we use this AsyncSequence pattern with notifications, so that we can handle meaningful notifications that happen while the app is running. The task associated with the asynchronous iteration through the notifications is something that will be running as long as the app is running. There are lots of examples of these sorts of long-running asynchronous tasks that might coincide with the lifespan of an app. As long as these tasks are all appropriately accounted for, you’re fine.

1 Like