How to manage lots of dynamic background work with async await (example code looking for improvement)

I have a large number of Identifiable value type items associated with an open document. I need to schedule background work for each item. And I need to schedule that work dynamically, since the document will be edited as the work is being performed.

To do this I've made a WorkProcessor with the following features:

  1. Cache results
  2. Cancel existing work
  3. Schedule new work
  4. Limit concurrency

I think it's working and might be useful to others. Also this is my first my first async/await code, so maybe there are bugs or easier ways to do things. Please take a look and let me know what to fix.

Thanks!

import Foundation

public actor WorkProcessor<Item: Identifiable & Equatable, WorkResult> {
    
    public typealias Work = (Item) async -> WorkResult
    
    private typealias WorkContinuation = CheckedContinuation<WorkResult, Error>

    private let work: Work
    private let workPriority: TaskPriority
    private let maxConcurrentWork: Int
    private var scheduled: [Item.ID : ScheduledItem] = [:]
    private var inProgress: [Item.ID : InProgressItem] = [:]
    private var completed: [Item.ID : CompletedItem] = [:]

    public init(work: @escaping Work, workPriority: TaskPriority = .medium, maxConcurrentWork: Int? = nil) {
        self.work = work
        self.workPriority = workPriority
        self.maxConcurrentWork = maxConcurrentWork ?? ProcessInfo.processInfo.activeProcessorCount
    }
            
    public func process(item: Item) async throws -> WorkResult {
        switch scheduleProcess(item: item) {
        case .scheduled:
            return try await withCheckedThrowingContinuation { continuation in
                scheduled[item.id]!.continuations.append(continuation)
            }
        case .inProgress(let task):
            return try await task.value
        case .complete(let result):
            return result
        }
    }
    
    /// Unlike `process` this code doesn't need to manage callback continuations, so is
    /// a little more efficient for the case when caller doesn't care about work result.
    public func scheduleProcess(item: Item) -> ScheduleResult {
        if let completedItem = completed[item.id] {
            if completedItem.isValid(for: item) {
                return .complete(completedItem.result)
            } else {
                completed.removeValue(forKey: item.id)
            }
        }
        
        if let inProgressItem = inProgress[item.id] {
            if inProgressItem.isValid(for: item) {
                return .inProgress(inProgressItem.task)
            } else {
                inProgress.removeValue(forKey: item.id)
                inProgressItem.task.cancel()
            }
        }
        
        if let scheduledItem = scheduled[item.id] {
            if scheduledItem.isValid(for: item) {
                return .scheduled
            } else {
                cancel(id: item.id)
            }
        }

        // No cached value found, or cache outdated, so schedule new work.
        
        if inProgress.count < maxConcurrentWork {
            return .inProgress(createTask(item: item).task)
        } else {
            scheduled[item.id] = ScheduledItem(item: item)
            return .scheduled
        }
    }

    public func cancel(id: Item.ID) {
        if var scheduledItem = scheduled.removeValue(forKey: id) {
            scheduledItem.cancel()
            return
        }

        if let inProgressItem = inProgress.removeValue(forKey: id) {
            inProgressItem.task.cancel()
            return
        }

        completed.removeValue(forKey: id)
    }
    
    public func cancelAll() {
        for var each in scheduled.values {
            each.cancel()
        }
        
        for each in inProgress.values {
            each.task.cancel()
        }

        scheduled.removeAll()
        inProgress.removeAll()
        completed.removeAll()
    }
    
    private func completed(item: Item, result: WorkResult) {
        if let inProgressItem = inProgress.removeValue(forKey: item.id), inProgressItem.isValid(for: item) {
            completed[item.id] = CompletedItem(item: item, result: result)
        }
        
        if let element = scheduled.randomElement(), inProgress.count < maxConcurrentWork {
            let scheduledItem = element.value
            scheduled.removeValue(forKey: element.key)
            _ = createTask(item: scheduledItem.item, continuations: scheduledItem.continuations)
        }
    }

    private func createTask(item: Item, continuations: [WorkContinuation]? = nil) -> InProgressItem {
        assert(inProgress[item.id] == nil)
        
        let inProgressItem = InProgressItem(item: item, task: Task(priority: workPriority) { [weak self, work] in
            try Task.checkCancellation()
            let result = await work(item)
            try Task.checkCancellation()
            if let self = self {
                await self.completed(item: item, result: result)
            }
            return result
        })
        
        if let continuations = continuations {
            Task(priority: workPriority) {
                do {
                    let result = try await inProgressItem.task.value
                    for each in continuations {
                        each.resume(returning: result)
                    }
                } catch {
                    for each in continuations {
                        each.resume(throwing: error)
                    }
                }
            }
        }
        
        inProgress[item.id] = inProgressItem
        
        return inProgressItem
    }

    public enum ScheduleResult {
        case scheduled
        case inProgress(Task<WorkResult, Error>)
        case complete(WorkResult)
    }

    private struct ScheduledItem {
        let item: Item
        var continuations: [WorkContinuation]
        func isValid(for item: Item) -> Bool {
            self.item == item
        }

        init(item: Item, continuations: [WorkContinuation] = []) {
            self.item = item
            self.continuations = continuations
        }
        
        mutating func cancel() {
            for each in continuations {
                each.resume(throwing: CancellationError())
            }
        }
    }

    private struct InProgressItem {
        let item: Item
        let task: Task<WorkResult, Error>
        func isValid(for item: Item) -> Bool {
            self.item == item
        }
    }

    private struct CompletedItem {
        let item: Item
        let result: WorkResult
        func isValid(for item: Item) -> Bool {
            self.item == item
        }
    }

}