Is this a good implementation of `withTimeout`?

In my app, I’m calling a system async API that sometimes seems to hang[1]. It never returns, and does not respond to cancellation. I want to add a timeout handler to this API to deal with the possibility of failure, so that I can write something like this:

let x = try await withTimeout(.seconds(5)) {
  try await thingThatMightHang()
}

My initial pass at implementation looked something like this:

public func withTimeout<T>(
        _ timeout: SuspendingClock.Duration, 
        work: @escaping () async throws -> T) async throws -> T 
{
    let deadline = SuspendingClock().now + timeout
    return try await withThrowingTaskGroup(of: T.self) { group in
        group.addTask {
            try await work()
        }
        group.addTask {
            try await Task.sleep(until: deadline, clock: .suspending)
            // If we got this far without being cancelled, then the
            // timeout has expired, so we should throw an error.
            throw CancellationError()
        }

        // Wait for the task to the complete, or the timeout to throw
        let result = try await group.next()

        // Cancel whichever task is still waiting.
        group.cancelAll()

        return result!
    }
}

However, because task groups always wait for all their children, this doesn’t actually work for me; it signals cancellation to the hanging API, but it never responds to the cancellation, and the caller is left hanging.

Instead, I want to create a version that spins the work off into a separate (unstructured) task that can be allowed to continue even after the timeout is reached. I could have used something like Task.select to create two tasks (one for the work, one for the timeout), and then wait for whichever finishes first… but it seems that work never got merged, and I no longer see it in the async-algorithms package.

Instead, I’ve created this, using AsyncSequence.merge. I’m looking for feedback on whether this is a reasonable implementation — are there ways I could simplify this? Or are there things I’m not considering?

public func withHardTimeout<T>(
        _ timeout: SuspendingClock.Duration,
        work: @escaping () async throws -> T) async throws -> T 
{
    let deadline = SuspendingClock().now + timeout
    
    // These mutexes let us pass along cancellation to the child tasks,
    // since cancellation may happen concurrently with the work.
    let workTaskMutex: Mutex<Task<Void, any Error>?> = .init(nil)
    let cancellationTaskMutex: Mutex<Task<Void, any Error>?> = .init(nil)
    
    return try await withTaskCancellationHandler {
        let (workStream, workContinuation) = AsyncThrowingStream.makeStream(of: T.self)
        let (cancelStream, cancelContinuation) = AsyncThrowingStream.makeStream(of: T.self)
        
        let workTask = Task {
            let result = try await work()
            workContinuation.yield(result)
            workContinuation.finish()
        }
        workTaskMutex.withLock { $0 = workTask }
        
        let cancellationTask = Task {
            try await Task.sleep(until: deadline, clock: .suspending)
            // If we got this far without being cancelled, then the
            // timeout has expired, so we should throw an error.
            cancelContinuation.finish(throwing: CancellationError())
        }
        cancellationTaskMutex.withLock { $0 = cancellationTask }
        
        let combinedStream = merge(workStream, cancelStream)
        let result = try await combinedStream.first(where: { _ in true })

        // If we got this far without cancelling, it means we got a result
        // from the workStream. So we're safe to unwrap it.
        return result!
    } onCancel: {
        workTaskMutex.withLock { $0?.cancel() }
        cancellationTaskMutex.withLock { $0?.cancel() }
    }
}

  1. I’ve filed a bug about this; suspect a hung system daemon, but that’s irrelevant to this post. Apple folks — see FB21292281. ↩︎

3 Likes

For one thing I think you have a potential race if your task gets cancelled after you've begun the body of withTaskCancellationHandler but before you install the task handles into the mutexes.

Generally the timeout concept is not ideal for composition - instead having a deadline for when things should be done is considerably better from an organizational standpoint but also for a correctness of when the event happens. Consider for example a HTTP request - if you specify a timeout then how much of that timeout gets set aside for DNS resolution, how much for completion of the transmission of the packets and then how much for the completion etc.

Task.select was initially an interesting idea but we quickly gave it up because it was rather wasteful and really couldn't be implemented in the runtime with any real sort of efficiency. That being said, currently there is no real way to efficiently spawn a secondary task just for the purpose of a deadline or timeout; as of current it is a pretty heavy weight operation (comparatively to what it is actually doing...).

What you have currently could be refined a decent amount; the merge and AsyncStream are probably not really needed and this could be constructed with thinner abstractions. I think you could (if done very carefully) a withTaskGroup would be thinner and perhaps simpler.

One additional side-note; it might be worth considering to have a different error than CancellationError to signify this is a timeout rather than a cancel.

3 Likes

Leaving aside the implementation for a second and just focusing on the public API. In my opinion, it is critical that this composes nicely with surrounding code. In particular this means:

  1. The closure should be non-escaping
  2. The closure should be non-sending/Sendable
  3. The closure and API should be nonisolated(nonsending)

Something along those lines:

nonisolated(nonsending) public func withTimeout<Return, Clock: _Concurrency.Clock>(
    in timeout: Clock.Duration,
    clock: Clock,
    body: nonisolated(nonsending) () async throws -> Return
) async throws -> Return

I have some experimental implementation that uses a few hacks that I would like to get rid off. I am also not sure that the nonisolated(unsafe) is actually correct here and doesn't cause a data race. Here is what I have right now:

private enum TaskResult<T: Sendable>: Sendable {
    case success(T)
    case error(any Error)
    case timedOut
    case cancelled
}

package struct TimeOutError: Error {
    var underlying: any Error
}

nonisolated(nonsending) public func withTimeout<Return, Clock: _Concurrency.Clock>(
    in timeout: Clock.Duration,
    clock: Clock,
    body: nonisolated(nonsending) () async throws -> Return
) async throws -> Return {
    nonisolated(unsafe) let body = body
    return try await _withTimeout(in: timeout, clock: clock) {
        try await body()
    }
}

nonisolated(nonsending) public func _withTimeout<T: Sendable, Clock: _Concurrency.Clock>(
    in timeout: Clock.Duration,
    clock: Clock,
    body: @Sendable () async throws -> T
) async throws -> T {
    try await withoutActuallyEscaping(body) { escapingBody in
        var t = Optional(escapingBody)
        return try await __withTimeout(in: timeout, clock: clock, body: &t)
    }
}

nonisolated(nonsending) public func __withTimeout<T: Sendable, Clock: _Concurrency.Clock>(
    in timeout: Clock.Duration,
    clock: Clock,
    body: inout Optional<@Sendable () async throws -> T>
) async throws -> T {
    let result: Result<T, any Error> = await withTaskGroup(of: TaskResult<T>.self) { group in
        let body = body.takeSending()!
        group.addTask {
            do {
                return .success(try await body())
            } catch {
                return .error(error)
            }
        }
        group.addTask {
            do {
                try await clock.sleep(for: timeout, tolerance: .zero)
                return .timedOut
            } catch {
                return .cancelled
            }
        }
        
        switch await group.next() {
        case .success(let result):
            // Work returned a result. Cancel the timer task and return
            group.cancelAll()
            return .success(result)
        case .error(let error):
            // Work threw. Cancel the timer task and rethrow
            group.cancelAll()
            return .failure(error)
        case .timedOut:
            // Timed out, cancel the work task.
            group.cancelAll()
            
            switch await group.next() {
            case .success(let result):
                return .success(result)
            case .error(let error):
                return .failure(TimeOutError(underlying: error))
            case .timedOut, .cancelled, .none:
                // We already got a result from the sleeping task so we can't get another one or none.
                fatalError("Unexpected task result")
            }
        case .cancelled:
            switch await group.next() {
            case .success(let result):
                return .success(result)
            case .error(let error):
                return .failure(TimeOutError(underlying: error))
            case .timedOut, .cancelled, .none:
                // We already got a result from the sleeping task so we can't get another one or none.
                fatalError("Unexpected task result")
            }
        case .none:
            fatalError("Unexpected task result")
        }
    }
    return try result.get()
}
2 Likes

This would only work with well behaving APIs that react promptly to cancelation. I don’t think this will solve the problem of this thread where the function never returns as we would need to return from withTimeout and keep the user defined closure “running”. This requires that the closure can escape.

4 Likes

Yes you are right this won’t solve the above problem. That problem can only be solved using an unstructured task and essentially abandoning the result.

Yeah, I’ve seen that advice elsewhere. And yet, someone has to create the deadline somewhere, right? I could certainly make withTimeout a convenience wrapper around a similar withDeadline.

I spent a fair amount of time trying to make it work with a task group… but ultimately, a task group requires that all of its child tasks are complete before it returns, right? In the case I described where the work task just hangs and never returns… is a task group usable? It seems to me that if I have work that might hang indefinitely, but I want to return even before that work completes, I have to spin that off into a separate unstructured Task.

1 Like

Double check that it works correctly if you call it N times, and the system API time-outs N times (e.g. set a very small timeout for the test), and N is sufficiently large number like 20.

1 Like

I remember using this device before async/await for similar purpose:

extension DispatchQueue {
    func async<T>(withTimeout timeout: TimeInterval, operation: @escaping () -> T, completion: @escaping (Result<T, Error>) -> Void) {
        var done = false
        let serialQueue = DispatchQueue(label: "serialQueue")
        
        func complete(_ result: Result<T, Error>) {
            serialQueue.async {
                if !done {
                    done = true
                    self.async {
                        completion(result)
                    }
                }
            }
        }
        
        DispatchQueue.global().async {
            complete(.success(operation()))
        }
        DispatchQueue.global().asyncAfter(deadline: .now() + timeout) {
            complete(.failure(NSError(domain: NSPOSIXErrorDomain, code: Int(ETIMEDOUT))))
        }
    }
}
1 Like

In the case I described where the work task just hangs and never returns… is a task group usable?

No, you are right that a child task prevents you from returning before it is done. But using an unstructured Task might also have issues, depending on how exactly the work closure "hangs":

  1. It could suspend forever, by not resuming some internal continuation.
  2. It could block, i.e. by never signaling a condition lock/semaphore or similar.
  3. It could run forever, i.e. by entering an infinite loop.

When designing your timeout function it's important to consider which of the cases you want to handle gracefully.

I think, only the first case could be handled properly by using (unstructured) tasks. The other cases will forever block the thread they are on. So in these cases you should not be using Swift's concurrent thread pool.

You could use other thread APIs to perform work, but the thread (and maybe other resources) would be wasted anyway.

2 Likes

Could you show a minimal example of each?
I thought they are all the same but I could be wrong. In particular in (1) and (2) what is it doing exactly if not entering some sort of infinite loop?

My interpretation of (1) is that "suspend" is being used in the Swift Concurrency sense. I.e., the cooperative Swift Concurrency thread has been donated back to the pool and is available to run work, but the task may never get resumed. In (2), the actual OS thread would be blocked, but it may not be busy-waiting (i.e., the OS scheduler knows it cannot do any work currently and so will not schedule it). In (3), the thread is not available to the Swift Concurrency thread pool and it is actively being scheduled by the OS.

Though I don't think the situation BJ describes is only relevant when the timed-out task is really-truly blocked. Sometimes you may just have a situation where the code you're calling cannot be trusted to clean itself up 'right away' in response to cancellation, even if it's still making forward progress.

2 Likes

In my particular case, it does seem to be suspended (case 1 as described above); I suspect there’s a daemon that’s never calling back, so the continuation is never called.

You could write a misbehaving function yourself like this:

func suspendForever() async {
  await withUnsafeContinuation { (continuation: UnsafeContinuation<Void,Never>) in
    // never call the continuation
  }
}
1 Like

Thank you.
Maybe withUnsafeContinuation could benefit from having timeout parameter?
Or, dare I say... await itself?
Are there any precedents of that in other languages?

Alternatively, withCheckedContinuation can see when the continuation is leaked, so if we could hook into the same event we could see, at least in some cases, when the callee is misbehaving, without worrying about the imprecision of a timer.

Yeha, but even if we were able doing that (e.g. if UnsafeContinuation had deinit or by other similar means), the non-leaking use-case would still be unaccounted for:

func suspendForever() async {
    await withUnsafeContinuation { (continuation: UnsafeContinuation<Void,Never>) in
        gContinuation = continuation
        // so it is not leaking. and could be potentially called in the distant future
        // but is never called
    }
}
1 Like

For this thread’s use case a continuation with automatic handling of cancellation would be even more useful: The timeout would be unnecessary if cancellation would resume the continuation.

Various implementations of a CancellingContinuation have been proposed. In my projects I need it often enough to want it in the standard library.

2 Likes

Could you show a minimal example of each?

func doesNotReturnAsync() async {
    _ = await Task { try await Task.sleep(for: .ever) }.result
}

func doesNotReturnBlocking() {
    let lock = NSLock()
    lock.lock(); lock.lock()
}

func doesNotReturnSpinning() {
    while true {}
}

Obviously these minimal examples seem nonsensical, but real world cases can conceptually result in similar situations. The latter two will block the thread, while the first one will only leak the task (and possible resources tied to the task's continuation).

2 Likes