flopshot
(Sean Najera)
1
I have async closures I'd like to run
let workAClosure: @Sendable () async -> Void = {
try? await Task.sleep(for: .seconds(1))
print("Work A: Almost done")
try? await Task.sleep(for: .seconds(1))
print("Work A: Complete")
}
let workBClosure: @Sendable () async -> Void = {
try? await Task.sleep(for: .seconds(1))
print("Work B: Almost done")
try? await Task.sleep(for: .seconds(1))
print("Work B: Complete")
}
Goal: I want to execute all work in a single closure before executing another closure of work/
If I queue this work in an AsyncStream using a Task
let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Task.self)
self.serialContinuation = serialContinuation
self.serialStreamTask = Task {
for await task in serialStream {
try? await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
}
}
let workA = Task(operation: workAClosure)
let workB = Task(operation: workBClosure)
let resultA = serialContinuation.yield(workA)
let resultB = serialContinuation.yield(workB)
The print statements are non deterministic.
"Work B *" prints may come before "Work A *"
However, if I use a simple Work struct instead of Task
struct Work {
let operation: @Sendable () async throws -> Void
func wait() async throws {
try await operation()
}
func cancel() {
// figure out a way to cancel "operation" in Work
}
}
let (serialStream, serialContinuation) = AsyncStream.makeStream(of: Work.self)
self.serialContinuation = serialContinuation
self.serialStreamTask = Task {
for await task in serialStream {
try? await withTaskCancellationHandler {
try await task.wait()
} onCancel: {
task.cancel()
}
}
}
let workA = Work(operation: workAClosure)
let workB = Work(operation: workBClosure)
let resultA = serialContinuation.yield(workA)
let resultB = serialContinuation.yield(workB)
the print statements are always deterministic
"Work A: Almost Done"
"Work A: Complete"
"Work B: Almost Done"
"Work B: Complete"
Could someone please tell me why this behavior is expected of async closures in Task but not Work
Side Note If actors had non-re-entrancy, I would just use that to enqueue tasks and allow them to run to their completion.
I've also debated using third party semaphores, but that seems risky
Follow Up
Does anyone know a way to cancel the operation in Work? I can seem to get a handle on that. I know I can cancel the stream itself, but it would be nice to have fine grain cancellation with Work
Jon_Shier
(Jon Shier)
2
Tasks are themselves executed immediately and concurrently, so you'll rarely get deterministic behavior when enqueuing many at once. In your example they started as soon as you created them, you yielding two ongoing tasks to the stream, and stream awaited their arrival, but by then they were already executing, and could finish in either order, regardless of the order you're awaiting them. Your solution is correct: enqueue the work directly, await it, then perform the next value. Essentially, they need to be lazy.
2 Likes