Why does THIS execute async work synchronously but Task doesn't

I have async closures I'd like to run

let workAClosure: @Sendable () async -> Void = {
   try? await Task.sleep(for: .seconds(1))
   print("Work A: Almost done")
   try? await Task.sleep(for: .seconds(1))
   print("Work A: Complete")
}

let workBClosure: @Sendable () async -> Void = {
   try? await Task.sleep(for: .seconds(1))
   print("Work B: Almost done")
   try? await Task.sleep(for: .seconds(1))
   print("Work B: Complete")
}

Goal: I want to execute all work in a single closure before executing another closure of work/
If I queue this work in an AsyncStream using a Task

        let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Task.self)
        self.serialContinuation = serialContinuation

        self.serialStreamTask = Task {
            for await task in serialStream {
                try? await withTaskCancellationHandler {
                    try await task.value
                } onCancel: {
                    task.cancel()
                }
            }
        }

        let workA = Task(operation: workAClosure)
        let workB = Task(operation: workBClosure)
        let resultA = serialContinuation.yield(workA) 
        let resultB = serialContinuation.yield(workB) 

The print statements are non deterministic.
"Work B *" prints may come before "Work A *"

However, if I use a simple Work struct instead of Task

struct Work {
    let operation: @Sendable () async throws -> Void

    func wait() async throws {
        try await operation()
    }

    func cancel() {
        // figure out a way to cancel "operation" in Work
    }
}

        let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Work.self)
        self.serialContinuation = serialContinuation

        self.serialStreamTask = Task {
            for await task in serialStream {
                try? await withTaskCancellationHandler {
                    try await task.wait()
                } onCancel: {
                    task.cancel()
                }
            }
        }

        let workA = Work(operation: workAClosure)
        let workB = Work(operation: workBClosure)
        let resultA = serialContinuation.yield(workA) 
        let resultB = serialContinuation.yield(workB)  

the print statements are always deterministic

"Work A: Almost Done"
"Work A: Complete"

"Work B: Almost Done"
"Work B: Complete"

Could someone please tell me why this behavior is expected of async closures in Task but not Work

Side Note If actors had non-re-entrancy, I would just use that to enqueue tasks and allow them to run to their completion.
I've also debated using third party semaphores, but that seems risky

Follow Up
Does anyone know a way to cancel the operation in Work? I can seem to get a handle on that. I know I can cancel the stream itself, but it would be nice to have fine grain cancellation with Work

Tasks are themselves executed immediately and concurrently, so you'll rarely get deterministic behavior when enqueuing many at once. In your example they started as soon as you created them, you yielding two ongoing tasks to the stream, and stream awaited their arrival, but by then they were already executing, and could finish in either order, regardless of the order you're awaiting them. Your solution is correct: enqueue the work directly, await it, then perform the next value. Essentially, they need to be lazy.

3 Likes