Async/Await - multiple function calls, single execution

Hi everyone. I would like to implement function using Async/Await that will execute only once if multiple calls occurs at "the same time". e.g. Function execution takes 1s to finish, some piece of code calls this funtion which start to execute, then some other piecie of code after 100ms calls the same function again. I want save second caller callback and inform him about function result but without executing code again.

This is the code i came up with (I want serverCall function to be executed only once):

actor Manager {
    private var continuations: [CheckedContinuation<String, Never>] = []
    private var isPending: Bool = false
    
    public func getDataFromServer() async -> String {
        if isPending == false {
            isPending = true
            
            let result = await serverCall() // 1 - suspend

            continuations.forEach { $0.resume(returning: result) } // 4 - resume all waiting continuations
            continuations.removeAll()
            isPending = false
            
            return result
            
        } else {
            // 2 - second call goes here because isPending == true
            return await withCheckedContinuation { continuation in // 3 - suspend
                continuations.append(continuation) // 5 - add continuation, but too late. All other continuations already resumed. This will stay in this array forever (if 'getDataFromServer' wont be called again)
            }
        }
    }
    
    private func serverCall() async -> String {
        try! await Task.sleep(nanoseconds: 1000_000_000)
        return "success"
    }
}

What bothers me is saving continuation in array. In order to get continuation object 'withCheckedThrowingContinuation' function must be called which can suspend and lead to unwanted result like described in comments with steps form 1 to 5.

Am I right in my way of thinking?

I tried to break this code with thousand concurrent calls but I always get correct result.

documentation for function 'withCheckedContinuation' says "Suspends the current task, then calls the given closure..." but it does not looks like it.

If i'm wrong and correct function execution is just a fluke or incorrect testing method, is there any way to implement that kind of function using async/await? Or I have to wait for non-reentrant Actors?

Thank you :)

As far as I can understand, with SE-0338 soon being implemented, this can break in practice, now that withCheckedContinuation (a non-actor-isolated async function) will no longer be considered to inherit the caller's actor executor. Since this now creates a possibility of dispatch (due to executor switch), timings can be off, where some continuations are left dangling indefinitely until someone eventually call getDataFromServer() again.

A more rigid approach is perhaps modelling each outstanding request as an object, which would hold both the result OR any awaiting continuations. So when the new runtime behaviour goes in effect, the body of withCheckedContinuation can still recover the results if it is executed later than how it is currently.

An example:

The outstanding request object

final class OutstandingRequest<T, E: Error>: @unchecked Sendable {
    let lock = NSLock()
    let state = State.awaiting([])

    enum State {
        case completed(Result<T, E>)
        case awaiting([CheckedContinuation<String, Never>])
    }
}

where the actor holds a reference to the outstanding request object:

actor Manager {
    var current: OutstandingRequest<String, Never>? = nil
}

and the request deduplication logic goes:

// Manager.getDataFromServer() async -> String

// Check whether there is active work — this is actor isolated.
if let request = self.current {
    return await withCheckedContinuation { continuation in
        // Extra locking on `OutstandingRequest` is needed, since we are accessing it
        // from this non-actor isolated scope, which can be executed in parallel to the
        // actor after SE-0338.
        request.lock.lock()
        defer { request.lock.unlock() }

        switch request.state {
        case let .completed(result):
            continuation.resume(with: result)
        case let .awaiting(continuations):
            request.state = .awaiting(continuations + [continuation])
        }
    }
} else {
    let request = OutstandingRequest()
    self.current = request

    let result = await serverCall()
    self.current = nil

    request.lock.lock()
    let oldState = request.state
    request.state = .completed(result)
    request.lock.unlock()

    switch oldState {
    case let .awaiting(continuations):
        continuations.forEach { $0.resume(with: result) }
    case .completed:
        preconditionFailure("Not supposed to happen.")
    }

    return result
}
2 Likes

The function passed to withCheckedContinuation is isolated to the current executor (because it's not Sendable) and is meant to be executed without any sort of suspension. There's a bug with that on top-of-tree, which was indeed introduced by SE-0338, and which we're going to have to fix by making withCheckedContinuation an exception to SE-0338 so that it inherits its caller's executor. (This sort of thing was listed as a future direction of SE-0338, but sometimes the future comes faster than we expect. :)) So, in brief, no, you shouldn't need to work around that.

7 Likes

Thank you @Anders_Ha and @John_McCall, now it's clear :)

@John_McCall

After reading proposal SE-0338 I'm wondering if statement from proposal SE-0304 is still relevant?

"A closure passed to the Task initializer will implicitly inherit the actor execution context and isolation of the context in which the closure is formed."

I was trying to find informations about known bugs or planned changes to an accepted proposals like in case of withCheckedContinuation. I was looking in "Proposal Reviews" and "Announcements" for 0338 but no luck.
Where should I look to be up to date? Is there some sort of singe source of truth?

Thanks!

Closures passed to the Task initializer are special in that they pick up the (static) isolation of the surrounding code. This is a different kind of inheritance than the dynamic, just-keeps-running “inheritance” done by non-actor async functions prior to SE-0338, and it remains in effect.

As for how the continuation functions opt out of SE-0338, we haven’t figured out what we want to propose yet.

Got it!
Thanks:)