Running an async task with a timeout

Doesn't seem to work,
You can try the following code:

 @Sendable func testAsync1() async -> String {
            print("start")
            var value = 0
            for _ in 0...30_000_000 {
                value += 1
            }
            print("end with value:\(value)")
            return "\(value)"
        }

        Task.detached {
            do {
                let favoriteNumber: String = try await async(timeoutAfter: 0.25) {
                    let res = await testAsync1()
                    return res
                }
                print("Favorite number: \(favoriteNumber)")
            } catch {
                print("Error: \(error)")
            }
        }

I have the same problem, and i do not clearly understand how to throw before the group ends..
is it possible?

Just noticed your response! If you are in a busy loop (i.e. not doing 'await' on network I/O or some such), then you need to periodically check if the Task is cancelled. I confirmed the following code works (with the code from my original post of 'withTimeout' included here as well for clarity):

func run() {
    Task.detached {
        do {
            let favoriteNumber: String = try await withTimeout(seconds: 0.25) {
                let res = try await testAsync1()
                return res
            }
            print("Favorite number: \(favoriteNumber)")
        } catch {
            print("Error: \(error)")
        }
    }
}

@Sendable func testAsync1() async throws -> String {
    print("start")
    var value = 0
    for _ in 0...30_000_000 {
        value += 1
        if value % 100_000 == 0 {
            try Task.checkCancellation()
        }
    }
    print("end with value:\(value)")
    return "\(value)"
}

///
/// Execute an operation in the current task subject to a timeout.
///
/// - Parameters:
///   - seconds: The duration in seconds `operation` is allowed to run before timing out.
///   - operation: The async operation to perform.
/// - Returns: Returns the result of `operation` if it completed in time.
/// - Throws: Throws ``TimedOutError`` if the timeout expires before `operation` completes.
///   If `operation` throws an error before the timeout expires, that error is propagated to the caller.
public func withTimeout<R>(
    seconds: TimeInterval,
    operation: @escaping @Sendable () async throws -> R
) async throws -> R {
    return try await withThrowingTaskGroup(of: R.self) { group in
        let deadline = Date(timeIntervalSinceNow: seconds)

        // Start actual work.
        group.addTask {
            let result = try await operation()
            try Task.checkCancellation()
            return result
        }
        // Start timeout child task.
        group.addTask {
            let interval = deadline.timeIntervalSinceNow
            if interval > 0 {
                try await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000))
            }
            try Task.checkCancellation()
            // We’ve reached the timeout.
            throw TimedOutError()
        }
        // First finished child task wins, cancel the other task.
        let result = try await group.next()!
        group.cancelAll()
        return result
    }
}
1 Like

Yes, that's right - for such long running work checking multiple times, though not on every iteration is how you'd deal with this.

This year's WWDC talk on structured concurrency covers some of the "why" here: Beyond the basics of structured concurrency - WWDC23 - Videos - Apple Developer

@Doug_Stein If you reach the timeout, the group will throw and the group.cancelAll() function will not be called. Also not sure if I'm missing something, but the Date conversion seems unnecessary. Here's a minor tweak:


public func withTimeout<R>(
    seconds: TimeInterval,
    operation: @escaping @Sendable () async throws -> R
) async throws -> R {
    return try await withThrowingTaskGroup(of: R.self) { group in
        defer {
            group.cancelAll()
        }
        
        // Start actual work.
        group.addTask {
            let result = try await operation()
            try Task.checkCancellation()
            return result
        }
        // Start timeout child task.
        group.addTask {
            if seconds > 0 {
                try await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
            }
            try Task.checkCancellation()
            // We’ve reached the timeout.
            throw TimedOutError()
        }
        // First finished child task wins, cancel the other task.
        let result = try await group.next()!
        return result
    }
}

I would not recommend checking cancellation AFTER the work was already completed. It's a bit of a waste of computation -- cancellation is useful to avoid un-necessary work. But if the work was already done... there's not much reason to discard the result.

If someone wanted to discard the result they can do so on the calling function still.

I didn't write that part of the code. But I suppose it depends on the use-case. For myself, I was using this function in unit tests where I wanted to ensure things happened within a certain time frame. So the fact that it times out before getting the result was actually important.

I am trying to run some async function from a library, and it sometimes takes too long.
I tried to implement timeout mechanism with task groups. However, because the function does not handle cancellation request, and task group waits all children tasks, timeout mechanism does not work.

I've written a piece of code for race and timeout mechanism.

Race imitates Promise.race from Javascript. It accepts one or more async operation and returns the value of the one which completes first. Then it cancels all other operations. I used withTaskCancellationHandler, withCheckedThrowingContinuation and actors to achive it.

The timeout mechanism simply uses race mechanism in which one of the operations just sleeps and throws a timeout error.

I am not fully confident about my race mechanism. Do you think it has flaws and edge cases?

public func withTimeout<R>(interval: TimeInterval, operation: @escaping @Sendable () async throws -> R) async throws -> R {
    try await race(
        operation,
        {
            try await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000))
            throw WithTimeoutError.timeout
        }
    )
}

public enum WithTimeoutError: Error {
    case timeout
}

public func race<R>(_ operations: (@Sendable () async throws -> R)...) async throws -> R {
    let raceContinuationManager = RaceContinuationManager<R>()
    return try await withTaskCancellationHandler {
        try await withCheckedThrowingContinuation { continuation in
            raceContinuationManager.set(
                continuation: continuation,
                operationTasks: operations.map { operation in
                    .detached {
                        do {
                            let result = try await operation()
                            raceContinuationManager.set(result: .result(result))
                        } catch {
                            raceContinuationManager.set(result: .error(error))
                        }
                    }
                }
            )
        }
    } onCancel: {
        raceContinuationManager.set(result: .cancel)
    }
}


private actor RaceContinuationManager<R> {
    enum ContinuationResult<RR> {
        case result(RR)
        case error(Error)
        case cancel
    }

    var isContinued: Bool = false

    var result: ContinuationResult<R>? {
        didSet {
            if self.result != nil {
                self.operationTasks.forEach { $0.cancel() }
                self.runContinuationIfRequired()
            }
        }
    }

    private var continuation: CheckedContinuation<R, Error>? {
        didSet {
            if self.continuation != nil {
                self.runContinuationIfRequired()
            }
        }
    }

    private var operationTasks: [Task<Void, Error>] = [] {
        didSet {
            if self.result != nil {
                self.operationTasks.forEach { $0.cancel() }
            }
        }
    }

    private func runContinuationIfRequired() {
        guard let continuation, let result, !isContinued else { return }
        switch result {
        case .result(let result):
            continuation.resume(returning: result)
        case .error(let error):
            continuation.resume(throwing: error)
        case .cancel:
            continuation.resume(throwing: CancellationError())
        }
        self.isContinued = true
    }

    nonisolated func set(continuation: CheckedContinuation<R, Error>, operationTasks: [Task<Void, Error>]) {
        SafeTask {
            await self.setInner(continuation: continuation, operationTasks: operationTasks)
        }
    }

    private func setInner(continuation: CheckedContinuation<R, Error>, operationTasks: [Task<Void, Error>]) {
        self.continuation = continuation
        self.operationTasks = operationTasks
    }

    nonisolated func set(result: ContinuationResult<R>) {
        SafeTask {
            await self.setInner(result: result)
        }
    }

    private func setInner(result: ContinuationResult<R>) {
        if self.result != nil {
            return
        }
        self.result = result
    }
}

Of course, you can add withTaskCancellationHandler inside withTimeout.
But probably you can leave it to the operation to decide how the cancellation needs to be handled.

You can pass your non-supporting operation wrapped with withTaskCancellationHandler and then pass to withTimeout.

Also, handling cancellations with an actor is probably not what you should do.
Look at the thread How to use withTaskCancellationHandler properly? which extensively tries to exhaust this topic.