In my app, I’m calling a system async API that sometimes seems to hang[1]. It never returns, and does not respond to cancellation. I want to add a timeout handler to this API to deal with the possibility of failure, so that I can write something like this:
let x = try await withTimeout(.seconds(5)) {
try await thingThatMightHang()
}
My initial pass at implementation looked something like this:
public func withTimeout<T>(
_ timeout: SuspendingClock.Duration,
work: @escaping () async throws -> T) async throws -> T
{
let deadline = SuspendingClock().now + timeout
return try await withThrowingTaskGroup(of: T.self) { group in
group.addTask {
try await work()
}
group.addTask {
try await Task.sleep(until: deadline, clock: .suspending)
// If we got this far without being cancelled, then the
// timeout has expired, so we should throw an error.
throw CancellationError()
}
// Wait for the task to the complete, or the timeout to throw
let result = try await group.next()
// Cancel whichever task is still waiting.
group.cancelAll()
return result!
}
}
However, because task groups always wait for all their children, this doesn’t actually work for me; it signals cancellation to the hanging API, but it never responds to the cancellation, and the caller is left hanging.
Instead, I want to create a version that spins the work off into a separate (unstructured) task that can be allowed to continue even after the timeout is reached. I could have used something like Task.select to create two tasks (one for the work, one for the timeout), and then wait for whichever finishes first… but it seems that work never got merged, and I no longer see it in the async-algorithms package.
Instead, I’ve created this, using AsyncSequence.merge. I’m looking for feedback on whether this is a reasonable implementation — are there ways I could simplify this? Or are there things I’m not considering?
public func withHardTimeout<T>(
_ timeout: SuspendingClock.Duration,
work: @escaping () async throws -> T) async throws -> T
{
let deadline = SuspendingClock().now + timeout
// These mutexes let us pass along cancellation to the child tasks,
// since cancellation may happen concurrently with the work.
let workTaskMutex: Mutex<Task<Void, any Error>?> = .init(nil)
let cancellationTaskMutex: Mutex<Task<Void, any Error>?> = .init(nil)
return try await withTaskCancellationHandler {
let (workStream, workContinuation) = AsyncThrowingStream.makeStream(of: T.self)
let (cancelStream, cancelContinuation) = AsyncThrowingStream.makeStream(of: T.self)
let workTask = Task {
let result = try await work()
workContinuation.yield(result)
workContinuation.finish()
}
workTaskMutex.withLock { $0 = workTask }
let cancellationTask = Task {
try await Task.sleep(until: deadline, clock: .suspending)
// If we got this far without being cancelled, then the
// timeout has expired, so we should throw an error.
cancelContinuation.finish(throwing: CancellationError())
}
cancellationTaskMutex.withLock { $0 = cancellationTask }
let combinedStream = merge(workStream, cancelStream)
let result = try await combinedStream.first(where: { _ in true })
// If we got this far without cancelling, it means we got a result
// from the workStream. So we're safe to unwrap it.
return result!
} onCancel: {
workTaskMutex.withLock { $0?.cancel() }
cancellationTaskMutex.withLock { $0?.cancel() }
}
}
I’ve filed a bug about this; suspect a hung system daemon, but that’s irrelevant to this post. Apple folks — see FB21292281. ↩︎