Automatically Cancelling Continuations

When writing async code I often need to combine CheckedContinuation APIs with task cancellation.

Here's a motivating example for a network fetch de-duplication service:

func fetch(from url: URL) async throws -> Data {

    let operation = lookupOrCreateRunningOperation(for: url)

    return try await withCheckedThrowingContinuation { continuation in
        operation.addWaitingTask(continuation)
        // operation eventually resumes waiting tasks
    }
}

The missing piece is to unblock the suspended task on cancellation, using something like:

continuation.resume(throwing: CancellationError())

It is not easy to add this functionality because withTaskCancellationHandler is difficult to get right in a thread safe way. Also, automatic cancellation interferes with the requirement to resume the continuation exactly once. How would other code know if the continuation has been resumed by cancellation?

This led me to build CancellingContinuation as a reusable component, modeled after CheckedContinuation. It can serve as a drop in replacement and provides a similar API:

func fetch(from url: URL) async throws -> Data {

    let operation = lookupOrCreateRunningOperation(for: url)

    return try await withCancellingContinuation { continuation in
        operation.addWaitingTask(continuation)
    }
}

The continuation in the closure is a thread safe wrapper around CheckedContinuation which handles cancellation.

The code is here on Github.

I would love to hear about your opinion on this construct as a reusable component in Swift Concurrency. Also, I'm not sure about the correctness of the code and would welcome any comments.

Note on extended API

Swift's CheckedContinuation cannot be constructed directly. Instead, is is passed as an argument to the closure of with...Continuation APIs.

This is different in CancellingContinuation, mainly because of an implementation necessity: The cancellation handler needs something to address to before continuation can be accessed in the closure.

So I decided to embrace this feature by making it public. Next to the established with...Continuation API there is a second way on how to use the new construct:

func fetch(from url: URL) async throws -> Data {

    let operation = lookupOrCreateRunningOperation(for: url)

    let continuation = CancellingContinuation()
    operation.addWaitingTask(continuation)

    return try await continuation()
}

To await the result I'm using callAsFunction because I think it reads best at the point of use. (There's no verb in with[...]Continuation to reuse.)

Note on isolation

If you have problems with the isolation parameters, just comment them out for now. They are a somewhat new concept from SE-0420 and give my compiler some headaches.

Note on Hashable conformance

Another minor point is the conformance to Hashable: This is just there to enhance ergonomics for my typical use cases. Please ignore it in this discussion :)

TODO: Prevent dead lock

There's documentation (Cancellation handlers and locks) that reads like my use of resume in the cancellation handler is unsafe. I've not experienced any dead locks but nevertheless it might be safer to resume continuations outside of the lock's critical region.

3 Likes

Thatā€™s interesting case I personally havenā€™t though about implementing de-duplicationā€¦ need to check my tests for thatā€¦

Does operation conforms to Sendable? If so, I would probably go with operation handling cancellation of continuation, something like that probably:

let id = UUID()
let operation = loadOrCreate(for: url)
try await withTaskCancellationHandler {
    try await withCheckedThrowingContinuation {
        operation.add(continuation, with: id)
    }
} onCancel {
    operation.cancel(for: id)
}

So the operation is responsible for all the continuation handling. Or am I missing something?

Yes, moving this responsibility into the operation would be an obvious solution without AutoCancellingContinuation.

My point was, though, that the combination of awaiting a continuation and responding to cancellation of the waiting task is a recurring pattern. I found it in many situations, so I've decided to build a reusable solution.

The AutoCancellingContinuation lives in a ConcurrencyTools package I'm using in my projects. But actually I would prefer to find a similar facility in Swift's standard library because of general usefulness.

In fact, a lot of cancellation strategies emerge quite regularly. And many of them might be covered by stdlib, probably... But resuming continuation several times feels a bit wrong to me in context of this implementation.

Yet I have doubts how well it might play with overall idea of cooperative cancellation, as such APIs somewhat limits your control over rest of the work you might want to cancel.

It is also question from the design perspective: is it actually good idea after all? What I mean here, current ways to handle cancellation is forcing you to think about this a little more and choose different path (in case operation from the example is Sendable, handling cancellation will be much more easier).

I agree that this comes up a lot and the solution is hard to get right. Itā€™s definitely something worth exploring, Iā€™ll give your idea a look soon.

6 Likes

It would still be necessary to handle the race condition: Cancellation can happen before and after the continuation is added to the operation. And since the cancellation handler is synchronous, making Operation.cancel thread safe is another complication.

I agree. Maybe resuming should only be ignored for automatic cancellation?

I've updated the code so the behavior regarding multiple resume is now matching CheckedContinuation.

@nikolai.ruhe I have exactly the same need and I did this implementation :


public final class CancellableCheckedContinuation<T> : @unchecked Sendable {
    private var continuation: CheckedContinuation<T, any Error>?
    private let lock = NSLock()
    private var cancelled: Bool = false
    private var onCancel: (@Sendable () -> Void)?
    
    init() {
    }
    
    @available(iOS 13, *)
    public func setContinuation(_ continuation: CheckedContinuation<T, any Error>) -> Bool {
        var alreadyCancelled = false
        lock.withLock {
            if cancelled {
                alreadyCancelled = true
            } else {
                self.continuation = continuation
            }
        }
        if alreadyCancelled {
            continuation.resume(throwing: CancellationError())
        }
        return !alreadyCancelled
    }
    
    public func onCancel(_ action: @Sendable @escaping ()->Void) {
        var alreadyCancelled = false
        lock.withLock {
            if cancelled {
                alreadyCancelled = true
            } else {
                self.onCancel = action
            }
        }
        if alreadyCancelled {
            action()
        }
    }
    
    private func onContinuation(cancelled: Bool = false, _ action: (CheckedContinuation<T, any Error>) -> Void) {
        var safeContinuation: CheckedContinuation<T, any Error>?
        var safeOnCancel: (@Sendable () -> Void)?
        lock.withLock {
            self.cancelled = self.cancelled || cancelled
            safeContinuation = continuation
            safeOnCancel = onCancel
            continuation = nil
            onCancel = nil
        }
        if let safeContinuation {
            action(safeContinuation)
        }
        if cancelled {
            safeOnCancel?()
        }
    }
    
    public func resume(returning value: T) {
        onContinuation { 
            $0.resume(returning: value)
        }
    }

    public func resume(throwing error: Error) {
        onContinuation { 
            $0.resume(throwing: error)
        }
    }
    
    public var isCancelled: Bool {
        var cancelled: Bool = false
        lock.withLock {
            cancelled = self.cancelled
        }
        return cancelled
    }
    
    func cancel() {
        onContinuation(cancelled: true) { 
            $0.resume(throwing: CancellationError())
        }
    }
}

extension CancellableCheckedContinuation where T == Void {
    public func resume() {
        self.resume(returning: ())
    }
}


public func withCancellableCheckedContinuation<T>(function: String = #function, _ body: @escaping @Sendable (CancellableCheckedContinuation<T>) -> Void) async throws -> T {
    let cancellableContinuation = CancellableCheckedContinuation<T> ()
    return try await withTaskCancellationHandler {
        return try await withCheckedThrowingContinuation(function: function) { continuation in
            if cancellableContinuation.setContinuation(continuation) {
                body(cancellableContinuation)
            }
        }
    } onCancel: {
        cancellableContinuation.cancel()
    }
}

I like how you handle the CancellingContinuation's state in your version and perhaps I should have use withoutActuallyEscaping ...

In mine, I could handle cancellation in client part, which means you can write :

return try await withCancellableCheckedContinuation { continuation in
   let task = Task {
      do {
         let result = try await action() // Task.isCancelled could be use
            continuation.resume(returning: result)
         } catch {
            continuation.resume(throwing: error)
         }
   }
   continuation.onCancel {
      task.cancel()
   }
}
1 Like

I'm not sure if its possible to add conformances to existing stdlib types - see: Backwards-deployable Conformances but the usability of withTaskCancellationHandler could be much nicer if CheckedContinuation conformed to Identifiable

await withCheckedContinuation {
  continuations[$0.id] = $0
} onCancel: { id in
  Task { await self.cancelContinuation(with: id) }
}

That's the reason I added Hashable conformance to my implementation.

I'm not exactly sure how Identifiable can help, but there would be no problem adding the conformance on CancellingContinuation, as it's out own.

Good to see that you came to a similar implementation.

I like the explicit cancellation handler. In my use cases it's not really needed because cancellation would resume the continuation and the code following the await handles all situations similarly. But it's nice to have access to it in the closure when setting up an unstructured task like this.

Thanks. you are a saviour.