The full code is here, ready to be pasted into an Xcode playground.
Setup
Suppose I have an actor that protects access to a buffer of values (simplified example):
actor Buffer {
var elements: [Int] = []
func enqueue(_ value: Int) {
elements.append(value)
}
func dequeue() async -> Int {
// If the buffer is empty, wait for the next enqueued value
// ???
}
}
Now I want to have two concurrent tasks, one that fills the buffer and one that empties it:
let buffer = Buffer()
// Producer task
Task {
await buffer.enqueue(1)
try! await Task.sleep(nanoseconds: 400_000_000)
await buffer.enqueue(2)
await buffer.enqueue(3)
}
// Consumer task
Task {
while true {
let next = await buffer.dequeue()
print(next)
try! await Task.sleep(nanoseconds: 100_000_000)
}
}
(For simplicity, I omitted API to end the stream. As is, the consumer task will never complete.)
Question
My question is how to implement the dequeue
method with my desired semantics:
- If there is at least one element in the buffer, remove and return it.
- If the buffer is currently empty, suspend until another value has been enqueued.
Approach 1: spin in a loop
We could do this by spinning in an infinite loop (taking care to yield to give the other task a chance to call enqueue
):
func dequeue() async -> Int {
while true {
if !elements.isEmpty {
return elements.removeFirst()
} else {
await Task.yield()
}
}
}
That's obviously not ideal.
Approach 2: use a CheckedContinuation for signaling
What we need is a way for the enqueue
method to signal a suspended dequeue
method that it can resume. My idea was to use a CheckedContinuation
for this:
- When
dequeue
needs to suspend, it creates a continuation and stores it in a property. - When
enqueue
runs, it resumes the continuation, thus resumingdequeue
.
actor Buffer {
var elements: [Int] = []
private var isNotEmpty: CheckedContinuation<Void, Never>? = nil
deinit {
// TODO: If the continuation is not nil,
// resume it by throwing cancellation error?
}
func enqueue(_ value: Int) {
elements.append(value)
// Signal dequeue if needed.
isNotEmpty?.resume()
isNotEmpty = nil
}
func dequeue() async -> Int {
while true {
if !elements.isEmpty {
return elements.removeFirst()
} else {
// Suspend until another element is enqueued.
print("dequeue waiting")
await withCheckedContinuation { cont in
self.isNotEmpty = cont
}
}
}
}
}
In a more complex example, you could add a maximum buffer size to the Buffer
actor and use another continuation to have enqueue
suspend when the buffer is full.
Is this a valid use case for continuations?
Is this kind of intra-task communication — like a condition variable — a valid use case for continuations? Or are continuations supposed to only be used for interop with legacy code?
The full code is here, ready to be pasted into an Xcode playground.