What is Swift Concurrency compliant wait to wait for another Task to consume shared data ?
Context:
I have one Task receiving events. I have another Task consuming these events. I want the receiving Task to wait for the consumer Task to have process the event and publish a result before fetching the next event.
I try to implement this like this. The receiving event Task create a continuation pass it alongside the event. The consumer Task call continuation.resume... when done processing.
Simplified code:
let response = await withCheckedContinuation { (continuation: CheckedContinuation<ByteBuffer, Never>) in
// send a request and wait for the response
try await outbound.get("/next")
// wait for next invocation
let (headers, body) = try await inboundIterator.readFullResponse()
// invocation received, wrap it in an Invocation object
let invocation = Invocation(metadata: try InvocationMetadata(headers: headers), event: body, continuation: continuation)
// enqueue invocation in a shared structured on which our caller can iterate
await self.invocationsPool.push(invocation)
// the consumer of the invocation will call continuation.resume()
}
// now we have a response
try await outbound.post("/response", body: response)
The above doesn't work because we can't have an async closure for continuations.
error: cannot pass function of type '(CheckedContinuation<ByteBuffer, Never>) async throws -> Void' to parameter expecting synchronous function type
I tried using an AsyncSequence instead of a continuation and it works, but since there is a 1:1 relationship between the invocation and the result, and because the server will always send me one event at a time, I feel this is an overkill.
What's the proper way to implement the above in Swift 6?
Thank you booth. @ole indeed. This solution does not work. @t089 can you elaborate a bit ?
pool.push() is async because it’s a shared data structure already protected by a Mutex
How adding a second one will help ? Most importantly : access to the Mutex will be async, aren’t they ?
So very handwavy from the top of my head without taking cancellation into account and a bit shaky on the api details this is what I had in mind. You need to solve the problem from two sides. The pool could provide something like this.
class Pool: Sendable {
struct State {
var items : [Invocation] = []
var waiters: [CheckedContinuation<Invocation, Never>] = []
}
enum Action {
case resume(waiter: CheckedContinuation<Invocation, Never>, invocation: Invocation)
case doNothing
}
var state: Mutex<State> = .init(items: [], waiters: [])
// pushes an invocation to the pool or wakes up a waiter if there is one
func push(_ item: Invocation) {
let action = self.state.withLock {
if $0.waiters.count > 0 {
// take waiter and give it the invocation
return .resume(waiter: $0.waiters.removeFirst(), invocation: item)
} else {
$0.items.append(item)
return .doNothing
}
}
switch action {
case .resume(let waiter, let item):
waiter.resume(item)
case .doNothing: break
}
}
// suspends until there is an invocation to process
// the caller is REQUIRED to resume the continuation inside the invocation.
func poll() async -> Invocation {
return withCheckedContinuation { cont in
let item = self.state.withLock {
if $0.items.count > 0 {
return $0.items.removeFirst()!
} else {
$0.waiters.append(cont)
return nil
}
}
if let item {
cont.resume(item)
}
}
}
}
This looks good, but I think there might be a race between the two Mutexs:
Task 1 goes into push, does not find a continuation and is now in line 368 but before acquiring the _buffer lock.
At the same time Task 2 goes into next(), it acquires the _buffer lock, does not find anything in the buffer and leaves the lock again. Now Task 1 will acquire the _buffer lock and write the invocation into the buffer. Task 2 now acquires the _continuation lock and stores the continuation.
In the end you have one item in the buffer and a stored waiter continuation but they failed to meet each other.
I think if you combine the _continuation and _buffer behind a single mutex, you can avoid this situation.