Cancellable CheckedContinuation

Hi everyone, I am in the process of learning Swift Concurrency and am trying to figure out how to properly cancel tasks within which there is a Continuation wrapper over a callback based funcion.

As it happens, not all callback-based APIs in my project can be canceled:

let loadTask = loader.loadTask { data, error in
    // ...
}

loadTask.resume()
loadTask.cancel() // !!!: Unavailable

And I'm trying to wrap this code in Continuation

I know that Task implicitly captures self and therefore self will not be released until a result is returned or an error is thrown

While thinking and searching for information I came up with this solution: Make a safe storeage for Continuation, with an additional method that will throw CancellationError and delete Continuation

// Generics are omitted to make the code easier to understand

private final class ContinuationStore: @unchecked Sendable {
    
	private let lock = NSLock()
	private var cont: CheckedContinuation<String, Error>?

	func set(_ cont: CheckedContinuation<String, Error>?) {
		withCriticalRegion {
			self.cont = cont
		}
	}

	func resume(with result: Result<String, Error>) {
		withCriticalRegion {
			cont?.resume(with: result)
		}
	}

	func cancel() {
		withCriticalRegion {
			cont?.resume(throwing: CancellationError())
			cont = nil
		}
	}

	private func withCriticalRegion(_ execute: () -> Void) {
		lock.lock()
		defer { lock.unlock() }
		execute()
	}
}

And then use that storage along with the withTaskCancellationHandler method

final class CancellableContinuationExample {
	private let contStore = ContinuationStore()
	private let longWorker = LongWorker()

	public init() {}

	public func load() async throws -> String {
		try await withTaskCancellationHandler {
			try await withCheckedThrowingContinuation { cont in
				contStore.set(cont)
				longWorker.longWorkWithCallback() { [contStore] result in
					contStore.resume(with: .success(result))
				}
			}
		} onCancel: {
			contStore.cancel()
		}
	}
}

Can you please advise me if I am missing something in this decision and if there are better options?

6 Likes

Without commenting on this being the best approach I think the ContinuationStore is slightly wrong. If the task that calls CancellableContinuationExample.load is already cancelled, then the cancellation handler onCancel will be called immediately and you will not resume the continuation until the longWorker has done its work. You can fix this by adding a flag to indicate whether the continuation has already been cancelled such that the continuation will be resumed with a CancellationError once you set the continuation

private final class ContinuationStore: @unchecked Sendable {
	private let lock = NSLock()
	private var cont: CheckedContinuation<String, Error>?
    private var isCancelled: Bool = false

	func set(_ cont: CheckedContinuation<String, Error>) -> Bool {
		withCriticalRegion {
            if isCancelled {
                cont.resume(throwing: CancellationError())
                return false
            } 
            self.cont = cont
            return true
		}
	}
    ...
	func cancel() {
		withCriticalRegion {
            isCancelled = true
			cont?.resume(throwing: CancellationError())
			cont = nil
		}
	}
    ...
}

and then before the long work you don't start the long running work

try await withCheckedThrowingContinuation { cont in
	if !contStore.set(cont) { return }
	longWorker.longWorkWithCallback() { [contStore] result in
		contStore.resume(with: .success(result))
	}
}

CancellableContinuationExample has to be sendable too, because it is passed to onCancel closure which marked as @Sendable as it will involve crossing isolation boundary.

But I suppose the bigger issue here is cancellation, actually missing of it. You've said that the API itself does not support cancellation. Therefore, you cancellation of the task (via resuming continuation before) won't end its execution. Once its done it also will violate continuation contract of being resumed only once, which you can mitigate by adding check for Task.isCancelled in the callback. Yet work still won't be cancelled, you just won't receive its result.

2 Likes

That's a great point thank you!

I have another question: Can we avoid adding a flag to the ContinuationStore by checking if the task is canceled before opening withCheckedThrowingContinuation block?

public final class CancellableContinuation {

	private let continuationStorage = ContinuationStorage<String, Error>()
	private let longWorker = LongWorker()

	public init() {}

	public func load() async throws -> String {
		try await withTaskCancellationHandler {
			try Task.checkCancellation() // HERE
			return try await withCheckedThrowingContinuation { cont in
				continuationStorage.set(cont)
				longWorker.longWorkWithCallback(seconds: 5, result: "done") { [continuationStorage] result in
					continuationStorage.resume(returning: result)
				}
			}
		} onCancel: {
			continuationStorage.resume(throwing: CancellationError())
		}
	}
}

In general that's racy because the task (the one executing load) could be cancelled in-between the check and withCheckedThrowingContinuation. It's possible in a specific use that this doesn't happen, depending on what the caller does, but I'd be hesitant to make that kind of assumption. The compiler won't verify it for you.

1 Like