How to use withTaskCancellationHandler properly?

Let's say hypothetically we did want to use an actor or a global actor to achieve Sendable conformance with this example (though it looks like the lock is right way to go). I don't yet see a simpler approach which avoids launching two Tasks, example shown here:

@MainActor
private class URLSessionDataTaskHolder {
    var task: URLSessionDataTask?
    var isCancelled = false

    func cancel() {
        task?.cancel()
        isCancelled = true
    }
}

func data(for request: URLRequest) async throws -> (Data, URLResponse) {
    let dataTaskHolder = await URLSessionDataTaskHolder()

    return try await withTaskCancellationHandler(
        handler: {
            Task { @MainActor in
                dataTaskHolder.cancel()
            }
        },
        operation: {
            try await withCheckedThrowingContinuation { continuation in
                let dataTask = self.dataTask(with: request) { data, response, error in
                    guard let data = data, let response = response else {
                        let error = error ?? URLError(.badServerResponse)
                        return continuation.resume(throwing: error)
                    }

                    continuation.resume(returning: (data, response))
                }
                dataTask.resume()

                Task { @MainActor in
                    dataTaskHolder.task = dataTask
                    if dataTaskHolder.isCancelled {
                        dataTask.cancel()
                    }
                }
            }
        }
    )
}

(Note this also warns for Capture of 'dataTask' with non-sendable type 'URLSessionDataTask' in a '@Sendable' closure which I think can be safely suppressed with the upcoming @preconcurrency import Foundation)

My main questions are:

Is this solution actually safe?
Is there another approach which avoids manually locking and the launching of multiple Tasks?

No, there is no simpler approach.

It‘s worth noting that this is not a Sendable conformance issue — well, the missing Sendable conformance for URLSessionDataTask is, but that‘s easily fixed with extension URLSessionDataTask: @unchecked Sendable {} (it‘s documented to be thread-safe) or @preconcurrency import.

The fundamental issue is that the two contexts in which you‘re accessing the URLSessionDataTask — the cancellation handler and the body of withCheckedContinuation — are both synchronous. Since you need to be in an async context to message an actor, you have to create a task.

Yeah that's right.

I'll keep bringing up another potential future solution to this issue every time we hit this issue :wink: It truly is important to allow synchronous contexts to send to an actor, without awaiting and thus allowing such callers to be synchronous contexts. This could take the form of send actor.cancelWorkPlease() which enqueues to the actor's mailbox, but cannot wait on it. Such "send" would really be perfect for such uni-directional communication, such as cancelling some work etc.

I (personally) sure would hope to get to prototyping and offering such capability someday.

19 Likes

Including support then for “one way “ sends for distributed actors presumably ;-)

2 Likes

Yeah it’d be “many birds, with one stone”. But nothing is designed or done yet there, so we’ll see.

1 Like

Looking at the history of objc there is actually a precedent here; signatures similar - (oneway void)release. That was, back in the heady days of distributed objects, a rare but useful tool.

3 Likes

Indeed! :slight_smile:

i had forgotten oneway. between this and cooperative multi-tasking, everything old really is new again. :)

2 Likes

I'm beginning experimenting with the new concurrency models and ended up here pretty soon after.

I'm not a friend with mutex locks. How does this solution based on GCD queues looks to you?

protocol Cancellable: AnyObject {
    func cancel()
}

final class Canceller: @unchecked Sendable {

    private lazy var serialQueue = DispatchQueue(label: "\(String(describing: Canceller.self))-\(String(describing: ObjectIdentifier(self)))")
    private var isCancelled = false
    private weak var cancellable: (any Cancellable)?

    func cancel() {
        serialQueue.async { [weak self] in
            self?.isCancelled = true
            self?.cancellable?.cancel()
        }
    }

    func setCancellable(_ cancellable: any Cancellable) {
        serialQueue.async { [weak self, weak cancellable] in
            if self?.isCancelled == true {
                cancellable?.cancel()
            }
            self?.cancellable = cancellable
        }
    }
}

//////////////////////////////////////////////

extension URLSessionDataTask: Cancellable {}

extension APIClient {

    func request<R: APIRequest>(_ request: R) async throws -> R.ResponseBody {
        let canceller = Canceller()
        return try await withTaskCancellationHandler(
            handler: {
                canceller.cancel()
            },
            operation: { [weak self] in
                try await withCheckedThrowingContinuation { [weak self] checkedContinuation in
                    guard let self = self else {
                        checkedContinuation.resume(throwing: Incident(.info))
                        return
                    }
                    let task = self.request(request) { result in
                        switch result {
                            case .failure(let error):
                                checkedContinuation.resume(throwing: error)
                            case .success(let response):
                                checkedContinuation.resume(returning: response)
                        }
                        withExtendedLifetime(canceller) {}
                    }
                    canceller.setCancellable(task)
                }
            }
        )
    }
}

This looks correct to me. A few comments:

  • Most (all?) of your uses of [weak self] seem unnecessary. You're not dealing with retain cycles here.
  • I'd use serialQueue.sync over serialQueue.async. The additional overhead of async isn't warranted, the queue is never contended for any siginificant amount of time, so sync will never block for a meaningful amount of time.

I understand, but using an NSLock instead of the dispatch queue isn't harder and has less overhead:

final class Canceller: @unchecked Sendable {
    private let lock = NSLock()
    …

    func cancel() {
        lock.lock()
        self.isCancelled = true
        self.cancellable?.cancel()
        lock.unlock()
    }

    …
}
1 Like

I wouldn't use lazy var here as it is not thread-safe construction.

3 Likes

Thank you Ole for looking into this.

You're not dealing with retain cycles here.

Yeah, I already walked away from this. I know these weak captures are unnecessary but I somehow tend to not retain what I don't need to if it's as convenient as one keyword. I mean, there is no reason to retain an object just to cancel it. What was released doesn't need to be cancelled. Anyway I had a case for function with parallel requests that returns array of tasks so I lifted restriction for Cancellable and it doesn't need to be a class anymore, weaks are gone. Instead of using NSPointerArray with weak memory.

I'd use serialQueue.sync over serialQueue.async. The additional overhead of async isn't warranted, the queue is never contended for any siginificant amount of time, so sync will never block for a meaningful amount of time.

Can you elaborate on this? What is the additional overhead? I know sync will never block for a meaningful amount of time but still I don't see a reason for waiting for it. Or do you mean that async is that significantly more expensive than sync that in summary it pays off more?

And thank you for the lock example!

queue.async always involves a thread switch and often the spawning of a new thread. The overhead of this is vastly higher than the tiny amount of work you're performing. queue.sync on the other hand can use the current thread to perform its work. In the common case where the queue isn't contended, sync is way more efficient.

See libdispatch efficiency tips, compiled by @tclementdev, for more, especially "Don't use async to protect shared state" and "Don't use async for small tasks".

3 Likes

Wow thank you. I didn't know that. The more I'm thinking about it, it makes more sense though.

1 Like

Hi @ole, firstly thanks for all the discussions in the thread and they are very useful. But I have one question regarding data race accessing dataTaskHolder.task from different concurrent contexts. Can you kindly explain what kind of data race issues can happen if we don't lock the access to it? Thanks you.

Data races have undefined behavior, so technically anything can happen. See e.g. this answer from @John_McCall: Race condition behaviors - #13 by John_McCall

In the code in question, one concurrency context assigns a class instance to an unsynchronized property (which is previously nil), and another concurrency context calls a method on that property (if not nil). I don't know if anything may actually go wrong with this in real life, but even if it doesn't right now, that may change once the code is compiled with a different compiler version or optimization level.

I personally wrap both resume and cancel within the Sendable wrapper (an actor in my case), avoiding races between resuming, assigning the task, and cancelation:

extension URLSession {
    @available(iOS, deprecated: 15, message: "Use `data(from:delegate:)` instead")
    func data(with url: URL) async throws -> (Data, URLResponse) {
        try await data(with: URLRequest(url: url))
    }

    @available(iOS, deprecated: 15, message: "Use `data(for:delegate:)` instead")
    func data(with request: URLRequest) async throws -> (Data, URLResponse) {
        let sessionTask = SendableURLSessionTask()

        return try await withTaskCancellationHandler {
            try await withCheckedThrowingContinuation { continuation in
                Task {
                    await sessionTask.start(dataTask(with: request) { data, response, error in
                        guard let data = data, let response = response else {
                            continuation.resume(throwing: error ?? URLError(.badServerResponse))
                            return
                        }

                        continuation.resume(returning: (data, response))
                    })
                }
            }
        } onCancel: {
            Task { await sessionTask.cancel() }
        }
    }
}

private extension URLSession {
    actor SendableURLSessionTask {
        weak var task: URLSessionTask?

        func start(_ task: URLSessionTask) {
            self.task = task
            task.resume()
        }

        func cancel() {
            task?.cancel()
        }
    }
}

Yes there are two Task { ... }, but the actor synchronizes all of this, avoiding race conditions. You could also do this with a @unchecked Sendable implementation of a class with locks without Task, (but make sure to synchronize both resume and cancel), but this feels a little more native to me.

My two cents.

4 Likes

I also just run into a similar scenario where it took me a bit to figure out how to properly forward the cancellation to ASWebAuthenticationSession.

I went with the approach @robert.ryan posted, however I think it‘s incomplete. The actor has to further track if the cancellation was already issued as otherwise it would be possible that the cancellation will be missed, because the Task that runs start could happen after the Task that executes cancel. A simple extension of the actor with a boolean state does the trick. Furthermore if the start method does not start the URLSessionTask (or ASWebAuthenticationSession in my case), it has to report that, so that we can still signal the continuation and resume with a cancellation error.

Here‘s a more generic actor type that isn‘t bound to a specific type which requires this dance:


actor CancellationHandler {
    var isCancelled: Bool = false
    var performCancellation: ((Bool) -> Void)? = nil
    
    func perform(
        _ operation: @Sendable () -> Void, 
        onCancel performCancellation: @escaping @Sendable (Bool) -> Void 
    ) {
        if isCancelled {
            performCancellation(/* didExecuteOperation: */ false)
        } else {
            self.performCancellation = performCancellation
            operation()
        }
    }
    
    func cancel() {
        if !isCancelled, let performCancellation {
            performCancellation(/* didExecuteOperation: */true)
        }
        isCancelled = true
    }
}

2 Likes

FWIW, DevAndArtist is correct, and that according to SE-0304: Cancellation handlers:

If the task has already been cancelled at the point withTaskCancellationHandler is called, the cancellation handler is invoked immediately, before the operation block is executed.

So, one would need to check the cancelation state before starting the network request:

extension URLSession {
    @available(iOS, deprecated: 15, message: "Use `data(from:delegate:)` instead")
    func data(with url: URL) async throws -> (Data, URLResponse) {
        try await data(with: URLRequest(url: url))
    }

    @available(iOS, deprecated: 15, message: "Use `data(for:delegate:)` instead")
    func data(with request: URLRequest) async throws -> (Data, URLResponse) {
        let sessionTask = SessionTask()

        return try await withTaskCancellationHandler {
            try await withCheckedThrowingContinuation { continuation in
                Task {
                    await sessionTask.start(request, on: self) { data, response, error in
                        guard let data = data, let response = response else {
                            continuation.resume(throwing: error ?? URLError(.badServerResponse))
                            return
                        }

                        continuation.resume(returning: (data, response))
                    }
                }
            }
        } onCancel: {
            Task { await sessionTask.cancel() }
        }
    }
}

private extension URLSession {
    actor SessionTask {
        var state: State = .ready

        func start(_ request: URLRequest, on session: URLSession, completionHandler: @Sendable @escaping (Data?, URLResponse?, Error?) -> Void) {
            if case .cancelled = state {
                completionHandler(nil, nil, CancellationError())
                return
            }

            let task = session.dataTask(with: request, completionHandler: completionHandler)

            state = .executing(task)
            task.resume()
        }

        func cancel() {
            if case .executing(let task) = state {
                task.cancel()
            }
            state = .cancelled
        }
    }

    enum State {
        case ready
        case executing(URLSessionTask)
        case cancelled
    }
}
1 Like