How to use `withCheckedContinuation` with an async closure?

What is Swift Concurrency compliant wait to wait for another Task to consume shared data ?

Context:
I have one Task receiving events. I have another Task consuming these events. I want the receiving Task to wait for the consumer Task to have process the event and publish a result before fetching the next event.

I try to implement this like this. The receiving event Task create a continuation pass it alongside the event. The consumer Task call continuation.resume... when done processing.

Simplified code:

let response = await withCheckedContinuation { (continuation: CheckedContinuation<ByteBuffer, Never>)  in
        // send a request and wait for the response
        try await outbound.get("/next")

        // wait for next invocation
        let (headers, body) = try await inboundIterator.readFullResponse()

        // invocation received, wrap it in an Invocation object
        let invocation = Invocation(metadata: try InvocationMetadata(headers: headers), event: body, continuation: continuation)

        // enqueue invocation in a shared structured on which our caller can iterate
        await self.invocationsPool.push(invocation)

        // the consumer of the invocation will call continuation.resume()
    }

    // now we have a response
    try await outbound.post("/response", body: response)

The above doesn't work because we can't have an async closure for continuations.

error: cannot pass function of type '(CheckedContinuation<ByteBuffer, Never>) async throws -> Void' to parameter expecting synchronous function type

I tried using an AsyncSequence instead of a continuation and it works, but since there is a 1:1 relationship between the invocation and the result, and because the server will always send me one event at a time, I feel this is an overkill.

What's the proper way to implement the above in Swift 6?

(Edit: What I wrote is wrong. Please disregard.)

Can you refactor your code to do all the async operations outside the closure? Like this:

try await outbound.get("/next")
let (headers, body) = try await inboundIterator.readFullResponse()
let invocation = await withCheckedContinuation { (continuation: CheckedContinuation<ByteBuffer, Never>) in
    Invocation(metadata: try InvocationMetadata(headers: headers), event: body, continuation: continuation)
}
// Correct? Your sample code doesn't make clear what gets assigned to response.
let response = await self.invocationsPool.push(invocation)
try await outbound.post("/response", body: response)

The semantics should be the same, I think.

Alternatively, you could use the continuation closure's body just to return the continuation and also pull out the creation of the Invocation:

let continuation: CheckedContinuation<ByteBuffer, Never> = await withCheckedContinuation { $0 }
let invocation = …

I don't think this will work as withCheckedContinuation only returns once the continuation has been resumed.

Instead you have to make self.invocationsPool.push() a synchronous method (eg a Mutex<Dequeue>). Then you can write:

try await outbound.get("/next")
let (headers, body) = try await inboundIterator.readFullResponse()
let metadata = try InvocationMetadata(headers: headers)
let response = await withCheckedContinuation { (continuation: CheckedContinuation<ByteBuffer, Never>) in
   let invocation = Invocation(metadata: metadata, event: body, continuation: continuation)
   self.invocationsPool.push(invocation)
   // somebody else will eventually resume the continuation
}

try await outbound.post("/response", body: response)

Yes, you're right of course. My (stupid) mistake, sorry.

Thank you booth.
@ole indeed. This solution does not work.
@t089 can you elaborate a bit ?

pool.push() is async because it’s a shared data structure already protected by a Mutex
How adding a second one will help ? Most importantly : access to the Mutex will be async, aren’t they ?

Nevermind.

Grazie mille

So very handwavy from the top of my head without taking cancellation into account and a bit shaky on the api details this is what I had in mind. You need to solve the problem from two sides. The pool could provide something like this.

class Pool: Sendable {
   struct State {
      var items : [Invocation] = []
      var waiters: [CheckedContinuation<Invocation, Never>] = []
   }

   enum Action {
     case resume(waiter: CheckedContinuation<Invocation, Never>, invocation: Invocation)
     case doNothing
   }

   var state: Mutex<State> = .init(items: [], waiters: [])

   // pushes an invocation to the pool or wakes up a waiter if there is one
   func push(_ item: Invocation) {
      let action = self.state.withLock { 
        if $0.waiters.count > 0 {
           // take waiter and give it the invocation
           return .resume(waiter: $0.waiters.removeFirst(), invocation: item)
        } else { 
           $0.items.append(item)
           return .doNothing
        }
      }
      switch action {
         case .resume(let waiter, let item):
            waiter.resume(item)
         case .doNothing: break
      }
   }


   // suspends until there is an invocation to process
   // the caller is REQUIRED to resume the continuation inside the invocation.
   func poll() async -> Invocation {
      return withCheckedContinuation { cont in 
         let item = self.state.withLock { 
            if $0.items.count > 0 {
               return $0.items.removeFirst()!
            } else {
               $0.waiters.append(cont)
               return nil
            }
         }
         if let item {
            cont.resume(item)
         }
      }
   }
}

Thank you Tobias.
This is indeed very closed to the solution I implemented

This looks good, but I think there might be a race between the two Mutexs:

Task 1 goes into push, does not find a continuation and is now in line 368 but before acquiring the _buffer lock.

At the same time Task 2 goes into next(), it acquires the _buffer lock, does not find anything in the buffer and leaves the lock again. Now Task 1 will acquire the _buffer lock and write the invocation into the buffer. Task 2 now acquires the _continuation lock and stores the continuation.

In the end you have one item in the buffer and a stored waiter continuation but they failed to meet each other. :broken_heart:

I think if you combine the _continuation and _buffer behind a single mutex, you can avoid this situation.

1 Like

Thank you @t089 for having spotted this.
Can you review and comment on this PR ?