I have async closures I'd like to run
let workAClosure: @Sendable () async -> Void = {
try? await Task.sleep(for: .seconds(1))
print("Work A: Almost done")
try? await Task.sleep(for: .seconds(1))
print("Work A: Complete")
}
let workBClosure: @Sendable () async -> Void = {
try? await Task.sleep(for: .seconds(1))
print("Work B: Almost done")
try? await Task.sleep(for: .seconds(1))
print("Work B: Complete")
}
Goal: I want to execute all work in a single closure before executing another closure of work/
If I queue this work in an AsyncStream
using a Task
let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Task.self)
self.serialContinuation = serialContinuation
self.serialStreamTask = Task {
for await task in serialStream {
try? await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
}
}
let workA = Task(operation: workAClosure)
let workB = Task(operation: workBClosure)
let resultA = serialContinuation.yield(workA)
let resultB = serialContinuation.yield(workB)
The print statements are non deterministic.
"Work B *" prints may come before "Work A *"
However, if I use a simple Work
struct instead of Task
struct Work {
let operation: @Sendable () async throws -> Void
func wait() async throws {
try await operation()
}
func cancel() {
// figure out a way to cancel "operation" in Work
}
}
let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Work.self)
self.serialContinuation = serialContinuation
self.serialStreamTask = Task {
for await task in serialStream {
try? await withTaskCancellationHandler {
try await task.wait()
} onCancel: {
task.cancel()
}
}
}
let workA = Work(operation: workAClosure)
let workB = Work(operation: workBClosure)
let resultA = serialContinuation.yield(workA)
let resultB = serialContinuation.yield(workB)
the print statements are always deterministic
"Work A: Almost Done"
"Work A: Complete"
"Work B: Almost Done"
"Work B: Complete"
Could someone please tell me why this behavior is expected of async closures in Task
but not Work
Side Note If actors had non-re-entrancy, I would just use that to enqueue tasks and allow them to run to their completion.
I've also debated using third party semaphores, but that seems risky
Follow Up
Does anyone know a way to cancel the operation
in Work
? I can seem to get a handle on that. I know I can cancel the stream itself, but it would be nice to have fine grain cancellation with Work