I have a large number of Identifiable
value type items associated with an open document. I need to schedule background work for each item. And I need to schedule that work dynamically, since the document will be edited as the work is being performed.
To do this I've made a WorkProcessor
with the following features:
- Cache results
- Cancel existing work
- Schedule new work
- Limit concurrency
I think it's working and might be useful to others. Also this is my first my first async/await code, so maybe there are bugs or easier ways to do things. Please take a look and let me know what to fix.
Thanks!
import Foundation
public actor WorkProcessor<Item: Identifiable & Equatable, WorkResult> {
public typealias Work = (Item) async -> WorkResult
private typealias WorkContinuation = CheckedContinuation<WorkResult, Error>
private let work: Work
private let workPriority: TaskPriority
private let maxConcurrentWork: Int
private var scheduled: [Item.ID : ScheduledItem] = [:]
private var inProgress: [Item.ID : InProgressItem] = [:]
private var completed: [Item.ID : CompletedItem] = [:]
public init(work: @escaping Work, workPriority: TaskPriority = .medium, maxConcurrentWork: Int? = nil) {
self.work = work
self.workPriority = workPriority
self.maxConcurrentWork = maxConcurrentWork ?? ProcessInfo.processInfo.activeProcessorCount
}
public func process(item: Item) async throws -> WorkResult {
switch scheduleProcess(item: item) {
case .scheduled:
return try await withCheckedThrowingContinuation { continuation in
scheduled[item.id]!.continuations.append(continuation)
}
case .inProgress(let task):
return try await task.value
case .complete(let result):
return result
}
}
/// Unlike `process` this code doesn't need to manage callback continuations, so is
/// a little more efficient for the case when caller doesn't care about work result.
public func scheduleProcess(item: Item) -> ScheduleResult {
if let completedItem = completed[item.id] {
if completedItem.isValid(for: item) {
return .complete(completedItem.result)
} else {
completed.removeValue(forKey: item.id)
}
}
if let inProgressItem = inProgress[item.id] {
if inProgressItem.isValid(for: item) {
return .inProgress(inProgressItem.task)
} else {
inProgress.removeValue(forKey: item.id)
inProgressItem.task.cancel()
}
}
if let scheduledItem = scheduled[item.id] {
if scheduledItem.isValid(for: item) {
return .scheduled
} else {
cancel(id: item.id)
}
}
// No cached value found, or cache outdated, so schedule new work.
if inProgress.count < maxConcurrentWork {
return .inProgress(createTask(item: item).task)
} else {
scheduled[item.id] = ScheduledItem(item: item)
return .scheduled
}
}
public func cancel(id: Item.ID) {
if var scheduledItem = scheduled.removeValue(forKey: id) {
scheduledItem.cancel()
return
}
if let inProgressItem = inProgress.removeValue(forKey: id) {
inProgressItem.task.cancel()
return
}
completed.removeValue(forKey: id)
}
public func cancelAll() {
for var each in scheduled.values {
each.cancel()
}
for each in inProgress.values {
each.task.cancel()
}
scheduled.removeAll()
inProgress.removeAll()
completed.removeAll()
}
private func completed(item: Item, result: WorkResult) {
if let inProgressItem = inProgress.removeValue(forKey: item.id), inProgressItem.isValid(for: item) {
completed[item.id] = CompletedItem(item: item, result: result)
}
if let element = scheduled.randomElement(), inProgress.count < maxConcurrentWork {
let scheduledItem = element.value
scheduled.removeValue(forKey: element.key)
_ = createTask(item: scheduledItem.item, continuations: scheduledItem.continuations)
}
}
private func createTask(item: Item, continuations: [WorkContinuation]? = nil) -> InProgressItem {
assert(inProgress[item.id] == nil)
let inProgressItem = InProgressItem(item: item, task: Task(priority: workPriority) { [weak self, work] in
try Task.checkCancellation()
let result = await work(item)
try Task.checkCancellation()
if let self = self {
await self.completed(item: item, result: result)
}
return result
})
if let continuations = continuations {
Task(priority: workPriority) {
do {
let result = try await inProgressItem.task.value
for each in continuations {
each.resume(returning: result)
}
} catch {
for each in continuations {
each.resume(throwing: error)
}
}
}
}
inProgress[item.id] = inProgressItem
return inProgressItem
}
public enum ScheduleResult {
case scheduled
case inProgress(Task<WorkResult, Error>)
case complete(WorkResult)
}
private struct ScheduledItem {
let item: Item
var continuations: [WorkContinuation]
func isValid(for item: Item) -> Bool {
self.item == item
}
init(item: Item, continuations: [WorkContinuation] = []) {
self.item = item
self.continuations = continuations
}
mutating func cancel() {
for each in continuations {
each.resume(throwing: CancellationError())
}
}
}
private struct InProgressItem {
let item: Item
let task: Task<WorkResult, Error>
func isValid(for item: Item) -> Bool {
self.item == item
}
}
private struct CompletedItem {
let item: Item
let result: WorkResult
func isValid(for item: Item) -> Bool {
self.item == item
}
}
}