Concurrency: Hang when continuations resumed in Task, but not in actor

I'm attempting to recreate / rewrite some parts of Alamofire with the new concurrency system. I've replaced my classes with actors and have gotten simple calls working. However, I need a solution where callers can await the response of the network call. Currently, I'm creating a continuation and storing a closure to resume it, and putting that work inside of a handle I can await for the caller.

await async {
    await withCheckedContinuation { continuation in
        storage.waiters.append({
            continuation.resume(returning: self.storage.data)
        })
    }
}.get()

When the request completes, I call the closures in the waiters array. Calling them directly from an isolated actor method that's being awaited elsewhere works fine.

storage.waiters.forEach { $0() }

However, simply wrapping that call into a Task seems to create a hang where the continuation never completes the originally awaited handle.

async {
    storage.waiters.forEach { $0() }
}

I can see the closure being called to resume the continuation, but the outer await never completes. Perhaps I'm deadlocking because it's two Tasks, rather than one Task and an actor method? Any debugging suggestions? Or perhaps a different approach to allow clients to wait for the request to finish?

I can't tell enough from your code but could this be another case like the one I describe in this post: Structured concurrency deadlock help needed

The other thing I wondered is whether using detach instead of async at any point makes a useful difference in your case. I'm not sure of the context of your overall flow whether it is necessary (and as you can see from my other post I am not sure I have strong grip of when it is and isn't necessary).

It may be that neither of these things apply but I thought I should mention what came to mind.

Yes, a detached task works just fine. I guess the attached version leads to a deadlock, but I'm not quite sure why. I suppose it depends on how a continuation actually triggers its waiters. I guess the continuations waiter tries to complete its task but since another task is executing it can't? I'm not really sure how that would be reentrant, since the waiter is outside the actor.

Another thing I noticed: the async call in my original continuation implementation is unnecessary. I can simply await the continuation directly and waiters will be called when it's resumed. Still deadlocks when calling the continuation from an attached task.

My current (well, previous, since it's obviously incorrect) mental model was that async (or Task {}) was equivalent to actorQueue.async, where asyncDetached was equivalent to globalQueue.async. That's incomplete without consideration of task suspension, but I'm not sure how to bridge the two together.

Is this async/await call made from within an actor? I think if it is then it takes place on the actor it might need to be detached.

That said I do wonder if there is actually a bug currently in the beta software affecting both of us and leading to deadlocks when they shouldn't be happening (it is also quite possible that I don't yet properly understand structured concurrency).

It seems like the awaits on the actor are blocking other access to the actor rather than suspending without holding a lock (which is what I expected) when they can't immediately proceed.

Yes, it's within a single actor.

After rereading the continuations and structured concurrency proposal, I'm still not sure what the problem might be. As I understand it, this is the sequence of events.

  1. My actor, DataRequest, is created by Session, another actor. Session then creates a partial task to resume the request and then synchronously returns the DataRequest.
@discardableResult
nonisolated
func request(_ urlRequest: URLRequest) -> DataRequest {
    let request = DataRequest(.init(urlRequestConvertible: urlRequest, dataTaskProvider: self, requestModifiers: []))
    async {
       await request.resume()
    }
    return request
}
  1. Outside the DataRequest (in this case in a test), my continuation method is called to await the request's Data.
func data() async -> Data {
    await withCheckedContinuation { continuation in
        storage.waiters.append({
            continuation.resume(returning: self.storage.data)
        })
    }
}
  1. Concurrently, the partial task created in the Session.request method will execute DataRequest.resume(). This method attempts to stay synchronous by creating an unstructured task for the work needed to start a request. (Aside: It's probably an issue that I can create a partial task without handling the errors produced like I would without the partial task.) This work is executed but immediately suspends to await the urlRequest value.
func resume() {
    guard state.canTransition(to: .resumed) else { return }
    
    if state == .initialized {
        async {
            let urlRequest = try await configuration.urlRequestConvertible.urlRequest
            let dataTask = await configuration.dataTaskProvider.dataTask(with: urlRequest, for: self)
            storage.tasks.append(dataTask)
            dataTask.resume()
            state = .resumed
        }
    } else {
        storage.tasks.last?.resume()
        state = .resumed
    }
}
  1. The unstructured task started in 3 eventually completes, appending the task to storage, resuming the task, and updating the DataRequest.state to .resumed. At this point the task is executing and I'm getting delegate callbacks. I'll skip to the completion event.
  2. My URLSessionTaskDelegate receives didCompleteWithError and I create an unstructured task to call back to the DataRequest for the completed task. Since this is a nonisolated delegate method, it executes on the global concurrent executor.
nonisolated func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
    async {
        await self.session?.dataTaskMap[task]?.didComplete(producingError: error)
    }
}
  1. DataRequest.didComplete is called, which is where the ultimate issue occurs. I store any passed error, transition to the finishing state, and call all of the enqueued waiters. This case, it's just the one created in the test. Then the state is set to finished.
func didComplete(producingError error: Error?) {
    storage.underlyingError = error
    
    guard state.canTransition(to: .finishing) else { return }
    
    state = .finishing
    
    // Start response handling.
    //async {
        storage.waiters.forEach { $0() }
    //}
    guard state.canTransition(to: .finished) else { return }
    
    state = .finished
}
  1. In the synchronous case, the closure for the continuation is called, resuming it with the stored Data. The continuation transitions from the suspended to the scheduled state but isn't immediately executed since the didComplete method is still executing. Once didComplete finishes, the continuation is executed, data() executes, and the waiter receives its Data.
  2. In the async case, according to the proposal, "[t]he initializer creates a new task that begins executing the provided closure". Now, this seems to be saying the closure should execute immediately. If that's the case, the execution should match the sync case since there are no suspensions within the closure. However, that's not what I observe. Adding print statements within the async call indicates it's actually executed after didComplete has completed and the state has been set to finished. This is actually what I'd expect, but doesn't match what I interpret the proposal to mean. In any case, the continuation closure is called and then the hang occurs.

In the course of writing this all up, I actually fixed the issue by adding a print statement at the end of didComplete. Simply adding that statement got both the sync and async versions to work. This is definitely a bug. As far as I can tell, the didComplete method actually finishes executing, as the execution of the unstructured task picks up the .finished state.

I've created SR-14802 to track this bug.

This seems to have gotten worse in beta 2. Now nothing works unless I put a print statement in my request method, as without it the resume() method is never called and none of the later work is kicked off.

Terms of Service

Privacy Policy

Cookie Policy