Running an async task with a timeout

Doesn't seem to work,
You can try the following code:

 @Sendable func testAsync1() async -> String {
            print("start")
            var value = 0
            for _ in 0...30_000_000 {
                value += 1
            }
            print("end with value:\(value)")
            return "\(value)"
        }

        Task.detached {
            do {
                let favoriteNumber: String = try await async(timeoutAfter: 0.25) {
                    let res = await testAsync1()
                    return res
                }
                print("Favorite number: \(favoriteNumber)")
            } catch {
                print("Error: \(error)")
            }
        }

I have the same problem, and i do not clearly understand how to throw before the group ends..
is it possible?

Just noticed your response! If you are in a busy loop (i.e. not doing 'await' on network I/O or some such), then you need to periodically check if the Task is cancelled. I confirmed the following code works (with the code from my original post of 'withTimeout' included here as well for clarity):

func run() {
    Task.detached {
        do {
            let favoriteNumber: String = try await withTimeout(seconds: 0.25) {
                let res = try await testAsync1()
                return res
            }
            print("Favorite number: \(favoriteNumber)")
        } catch {
            print("Error: \(error)")
        }
    }
}

@Sendable func testAsync1() async throws -> String {
    print("start")
    var value = 0
    for _ in 0...30_000_000 {
        value += 1
        if value % 100_000 == 0 {
            try Task.checkCancellation()
        }
    }
    print("end with value:\(value)")
    return "\(value)"
}

///
/// Execute an operation in the current task subject to a timeout.
///
/// - Parameters:
///   - seconds: The duration in seconds `operation` is allowed to run before timing out.
///   - operation: The async operation to perform.
/// - Returns: Returns the result of `operation` if it completed in time.
/// - Throws: Throws ``TimedOutError`` if the timeout expires before `operation` completes.
///   If `operation` throws an error before the timeout expires, that error is propagated to the caller.
public func withTimeout<R>(
    seconds: TimeInterval,
    operation: @escaping @Sendable () async throws -> R
) async throws -> R {
    return try await withThrowingTaskGroup(of: R.self) { group in
        let deadline = Date(timeIntervalSinceNow: seconds)

        // Start actual work.
        group.addTask {
            let result = try await operation()
            try Task.checkCancellation()
            return result
        }
        // Start timeout child task.
        group.addTask {
            let interval = deadline.timeIntervalSinceNow
            if interval > 0 {
                try await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000))
            }
            try Task.checkCancellation()
            // We’ve reached the timeout.
            throw TimedOutError()
        }
        // First finished child task wins, cancel the other task.
        let result = try await group.next()!
        group.cancelAll()
        return result
    }
}
1 Like

Yes, that's right - for such long running work checking multiple times, though not on every iteration is how you'd deal with this.

This year's WWDC talk on structured concurrency covers some of the "why" here: Beyond the basics of structured concurrency - WWDC23 - Videos - Apple Developer

@Doug_Stein If you reach the timeout, the group will throw and the group.cancelAll() function will not be called. Also not sure if I'm missing something, but the Date conversion seems unnecessary. Here's a minor tweak:


public func withTimeout<R>(
    seconds: TimeInterval,
    operation: @escaping @Sendable () async throws -> R
) async throws -> R {
    return try await withThrowingTaskGroup(of: R.self) { group in
        defer {
            group.cancelAll()
        }
        
        // Start actual work.
        group.addTask {
            let result = try await operation()
            try Task.checkCancellation()
            return result
        }
        // Start timeout child task.
        group.addTask {
            if seconds > 0 {
                try await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
            }
            try Task.checkCancellation()
            // We’ve reached the timeout.
            throw TimedOutError()
        }
        // First finished child task wins, cancel the other task.
        let result = try await group.next()!
        return result
    }
}

I would not recommend checking cancellation AFTER the work was already completed. It's a bit of a waste of computation -- cancellation is useful to avoid un-necessary work. But if the work was already done... there's not much reason to discard the result.

If someone wanted to discard the result they can do so on the calling function still.

1 Like

I didn't write that part of the code. But I suppose it depends on the use-case. For myself, I was using this function in unit tests where I wanted to ensure things happened within a certain time frame. So the fact that it times out before getting the result was actually important.

I am trying to run some async function from a library, and it sometimes takes too long.
I tried to implement timeout mechanism with task groups. However, because the function does not handle cancellation request, and task group waits all children tasks, timeout mechanism does not work.

I've written a piece of code for race and timeout mechanism.

Race imitates Promise.race from Javascript. It accepts one or more async operation and returns the value of the one which completes first. Then it cancels all other operations. I used withTaskCancellationHandler, withCheckedThrowingContinuation and actors to achive it.

The timeout mechanism simply uses race mechanism in which one of the operations just sleeps and throws a timeout error.

I am not fully confident about my race mechanism. Do you think it has flaws and edge cases?

public func withTimeout<R>(interval: TimeInterval, operation: @escaping @Sendable () async throws -> R) async throws -> R {
    try await race(
        operation,
        {
            try await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000))
            throw WithTimeoutError.timeout
        }
    )
}

public enum WithTimeoutError: Error {
    case timeout
}

public func race<R>(_ operations: (@Sendable () async throws -> R)...) async throws -> R {
    let raceContinuationManager = RaceContinuationManager<R>()
    return try await withTaskCancellationHandler {
        try await withCheckedThrowingContinuation { continuation in
            raceContinuationManager.set(
                continuation: continuation,
                operationTasks: operations.map { operation in
                    .detached {
                        do {
                            let result = try await operation()
                            raceContinuationManager.set(result: .result(result))
                        } catch {
                            raceContinuationManager.set(result: .error(error))
                        }
                    }
                }
            )
        }
    } onCancel: {
        raceContinuationManager.set(result: .cancel)
    }
}


private actor RaceContinuationManager<R> {
    enum ContinuationResult<RR> {
        case result(RR)
        case error(Error)
        case cancel
    }

    var isContinued: Bool = false

    var result: ContinuationResult<R>? {
        didSet {
            if self.result != nil {
                self.operationTasks.forEach { $0.cancel() }
                self.runContinuationIfRequired()
            }
        }
    }

    private var continuation: CheckedContinuation<R, Error>? {
        didSet {
            if self.continuation != nil {
                self.runContinuationIfRequired()
            }
        }
    }

    private var operationTasks: [Task<Void, Error>] = [] {
        didSet {
            if self.result != nil {
                self.operationTasks.forEach { $0.cancel() }
            }
        }
    }

    private func runContinuationIfRequired() {
        guard let continuation, let result, !isContinued else { return }
        switch result {
        case .result(let result):
            continuation.resume(returning: result)
        case .error(let error):
            continuation.resume(throwing: error)
        case .cancel:
            continuation.resume(throwing: CancellationError())
        }
        self.isContinued = true
    }

    nonisolated func set(continuation: CheckedContinuation<R, Error>, operationTasks: [Task<Void, Error>]) {
        SafeTask {
            await self.setInner(continuation: continuation, operationTasks: operationTasks)
        }
    }

    private func setInner(continuation: CheckedContinuation<R, Error>, operationTasks: [Task<Void, Error>]) {
        self.continuation = continuation
        self.operationTasks = operationTasks
    }

    nonisolated func set(result: ContinuationResult<R>) {
        SafeTask {
            await self.setInner(result: result)
        }
    }

    private func setInner(result: ContinuationResult<R>) {
        if self.result != nil {
            return
        }
        self.result = result
    }
}

Of course, you can add withTaskCancellationHandler inside withTimeout.
But probably you can leave it to the operation to decide how the cancellation needs to be handled.

You can pass your non-supporting operation wrapped with withTaskCancellationHandler and then pass to withTimeout.

Also, handling cancellations with an actor is probably not what you should do.
Look at the thread How to use withTaskCancellationHandler properly? which extensively tries to exhaust this topic.

Since this is seemingly the main reference that pops up in web searches on this topic, I thought I'd add my own tidbit of experience here. The quirk I ran up against was that the thing I was waiting on a timeout on was an external API that I was calling that would basically never return in certain circumstances (PHImageManager.requestImage() if you're curious). This meant that once invoked, I had no way to check for Task cancellation myself, so even if the timeout triggered and cancelled the task group, my work task would still sit there forever since it had no opportunity to see that it had been cancelled.

My solution was to add an additional timeoutHandler closure parameter to Ole's implementation, and then call timeoutHandler() just before throwing the timeout error. That would give my code the chance to call the corresponding cancellation API (PHImageManager.cancelImageRequest()) for the external thing I was waiting on, thus letting it

So the function declaration changes to:

func `async`<R>(
  timeoutAfter maxDuration: TimeInterval,
  do work: @escaping () async throws -> R,
  timeoutHandler: (@escaping () -> Void)? = nil

And then the timeout task changes to:

    // Start timeout child task.
    group.async {
      await Task.sleep(UInt64(maxDuration * 1_000_000_000))
      try Task.checkCancellation()
      // We’ve reached the timeout.
      timeoutHandler?()
      throw TimedOutError()
    }
2 Likes

Nice addition, thanks!

Could you elaborate a bit more, why you need the extra closure? Naively, since I am not familiar with this API, I would‘ve just used withTaskCancellationHandler like this:

extension PHImageManager {
    
    func requestImage(
        for asset: PHAsset,
        targetSize: CGSize,
        contentMode: PHImageContentMode,
        options: PHImageRequestOptions?
    ) async throws -> UIImage {
        let requestId = LockIsolated<PHImageRequestID?>(nil)
        return try await withTaskCancellationHandler {
           return try await withUnsafeThrowingContinuation { continuation in
               let newRequestId = requestImage(for: asset, targetSize: targetSize, contentMode: contentMode, options: options) { image, _ in
                   if let image {
                       continuation.resume(returning: image)
                   } else {
                       continuation.resume(throwing: CancellationError())
                   }
               }
               requestId.setValue(newRequestId)
            }
        } onCancel: {
            requestId.withValue { requestId in
                if let requestId {
                    cancelImageRequest(requestId)
                }
            }
        }
    }
}

LockIsolated is a helper from swift-concurrency-extras but I‘d imagine you could use Mutex instead, when it is released.

Edit: Please do not use this snippet, it is just for demonstration purposes. It will crash in some conditions because the requestImage closure can be called multiple times.

I tried using withTaskCancellationHandler but could never get my handler to be called, though it's certainly possible I was holding it wrong. Adding the separate timeout handler gave me a concrete way to cancel out of the image request and seemed to do the trick.

This timeout is excellent for continueous sequencing in background!

@lyzkov
Most (if not all) of the snippets posted in this thread have some rough edges (though I am not criticizing any solution, since some things were just not available back then):

  • Using Foundation is not necessary (anymore). You can use Task.sleep(until:tolerance:clock:).
  • R should conform to Sendable. This is a requirement of TaskGroup and it warns you if you use strict concurrency.
  • If you cancel the Task in which withTimeout was started it could swallow the cancellation behavior of operation and just throw CancellationError from the Task.sleep because TaskGroup will forward the first error it receives from its children.
  • The try Task.checkCancellation() is awkward, as @ktoso already mentioned here:
1 Like

According to your priceless suggestion to use most recent API for task sleep I've implemented improved variants for continuous clock (realtime scheduling) and suspending clock (cputime scheduling).

import Foundation.NSDate // for TimeInterval

// Based on: https://forums.swift.org/t/running-an-async-task-with-a-timeout/49733/21
public struct TimedOutError: Error, Equatable {}

/// Execute an operation in the current task subject to a timeout.
///
/// - Parameters:
///   - timeout: The time duration in which `operation` is allowed to run before timing out.
///   - tolerance: The time duriation that is allowed for task scheduler to delay operation timeout
///   in case of computationaly sparse resource.
///   - clock: The clock which is suitable for task scheduling.
///   - operation: The asynchronous operation to perform.
/// - Returns: The result of `operation` if it completed in time.
/// - Throws: Throws ``TimedOutError`` if the timeout expires before `operation` completes.
///   If `operation` throws an error before the timeout expires, that error is propagated to the caller.
public func with<Return: Sendable, C: Clock>(
    timeout: C.Instant.Duration,
    tolerance: C.Instant.Duration? = nil,
    clock: C,
    operation: @escaping @Sendable () async throws -> Return
) async rethrows -> Return {
    try await withThrowingTaskGroup(of: Return.self) { group in
        let expiration: C.Instant = .now.advanced(by: timeout)
        defer {
            group.cancelAll() // cancel the other task
        }
        group.addTask {
            try await Task.sleep(
                until: expiration,
                tolerance: tolerance,
                clock: clock
            ) // sleep supports cancellation
            throw TimedOutError() // timeout has been reached
        }
        group.addTask {
            try await operation()
        }
        // first finished child task wins
        return try await group.next()! // never fails
    }
}

/// Execute an operation in the current task subject to a timeout with continuous clock
/// suitable for realtime task scheduling.
///
/// - Parameters:
///   - timeout: The time duration in which `operation` is allowed to run before timing out.
///   - tolerance: The time duriation that is allowed for task scheduler to delay operation timeout
///   in case of computationaly sparse resource.
///   - operation: The asynchronous operation to perform.
/// - Returns: The result of `operation` if it completed in time.
/// - Throws: Throws ``TimedOutError`` if the timeout expires before `operation` completes.
///   If `operation` throws an error before the timeout expires, that error is propagated to the caller.
public func with<Return: Sendable>(
    timeout: ContinuousClock.Instant.Duration,
    tolerance: ContinuousClock.Instant.Duration? = nil,
    operation: @escaping @Sendable () async throws -> Return
) async rethrows -> Return {
    try await with(
        timeout: timeout,
        tolerance: tolerance,
        clock: .continuous,
        operation: operation
    )
}

fileprivate extension InstantProtocol {
    static var now: Self {
        switch Self.self {
        case is ContinuousClock.Instant.Type:
            ContinuousClock.Instant.now as! Self
        case is SuspendingClock.Instant.Type:
            SuspendingClock.Instant.now as! Self
        default:
            fatalError("Not implemented")
        }
    }
}

Please suggest some improvements if you have any idea how to deal with InstantProtocol.now. Feel free to test it in your codebase.

1 Like

I‘ve open-sourced my implementation a couple of days ago. You can check it out here: GitHub - ph1ps/swift-concurrency-deadline: A deadline algorithm for Swift Concurrency.

It should (hopefully) cover all edge cases.

5 Likes

With Swift 6 you can inherit isolation so the closure can be executed within it sidestepping @Sendable.

In swift-timeout I have something like this:

func withThrowingTimeout<T>(
    isolation: isolated (any Actor)? = #isolation,
    after instant: ContinuousClock.Instant,
    body: () async throws -> sending T
) async throws -> sending T
1 Like