How do you use AsyncStream to make Task execution deterministic?

The goal of AsyncChannel is to produce communication between the individual tasks pushed through the stream, with back pressure.

I don't believe that's the same as the discussions here, which aim to find a solution for a serial queue of async tasks.

Why not just use a serial OperationQueue for each individual task? In such a case, override Operation with something that contains a CheckedContinuation.

Thus you can do something like this:

func performTask(task: SomeTask) async throws {
  await withCheckedContinuation( { [weak self] continuation in  
    operationQueue.addOperation(CheckedOperation(task, continuation))
  })
}

Because these tasks are not concurrent, but serial, all you need to do is override main. So your operation looks like this.

class CheckedOperation: Operation {
   let task: SomeTask
   let continuation: CheckContinuation<Result, Error>
   
   func main() {
       Task {
           do { 
                try await task.SomeAsyncFunction()
                continuation.finish()
           } catch { 
                 // Handle error
           }
        }
   }
}

Also, in a serial Queue, make sure the maxConcurrentOperationCount = 1.

The syntax for all this is much easier to follow, and you don't need to worry about managing the operation execution queue.

Would love to hear your thoughts on this approach!

2 Likes

My own version of a task queue, similar to yours but a little shorter and in my opinion easier to understand. I didn't include any check for cancellation as I think this should be left to the tasks themselves, just like OperationQueue does.

actor TaskQueue {
    
    private let maxConcurrentTasks: Int
    private var runningTasks = 0
    private var queue = [CheckedContinuation<Void, Never>]()
    
    init(maxConcurrentTasks: Int) {
        self.maxConcurrentTasks = maxConcurrentTasks
    }
    
    func add<T>(_ task: @escaping () async throws -> T) async throws -> T {
        if runningTasks >= maxConcurrentTasks {
            await withCheckedContinuation { continuation in
                queue.append(continuation)
            }
        }
        
        runningTasks += 1
        defer {
            runningTasks -= 1
            if runningTasks < maxConcurrentTasks && !queue.isEmpty {
                queue.removeFirst().resume()
            }
        }
        return try await task()
    }
    
}
2 Likes