How to cancel a publisher when using withTaskCancellationHandler

I'd like to be able cancel a publisher I kick off when implementing an async method:

 public func query(parameters: QueryParameters) async throws -> FeatureQueryResult {
        var sub: AnyCancellable?
        return try await withTaskCancellationHandler {
            sub?.cancel()
        } operation: {
            try await withCheckedThrowingContinuation { continuation in
                ...
                sub = myPublisher.sink { completion in
                    ...
                    continuation.resume(with: myResult)
                } receiveValue: { _ in }
            }
        }
    }

However, this gives me the following error:

Reference to captured var 'sub' in concurrently-executing code

I saw this basic pattern mentioned here.

1 Like

There’s an updated version of this snippet in the current Structured Concurrency proposal. I can’t test it right now but I hope it’s valid. :-)

Thanks. No, that code doesn't work as far as I can tell. The parameter order has been reversed, the code doesn't await the call to withTaskCancellationHandler. Other than that it basically seems identical to what I've got and what you originally wrote. Unless I'm missing something...

Also, if I comment out the call to cancel():

public func query(parameters: QueryParameters) async throws -> FeatureQueryResult {
        var sub: AnyCancellable?
        return try await withTaskCancellationHandler {
            //sub?.cancel()
        } operation: {
            try await withCheckedThrowingContinuation { continuation in
                ...
                sub = myPublisher.sink { completion in
                    ...
                    continuation.resume(with: myResult)
                } receiveValue: { _ in }
            }
        }
    }

Then it compiles but I get a warning:

Variable 'sub' was written to, but never read

Then of course if I don't hold onto the cancellable at all:

public func query(parameters: QueryParameters) async throws -> FeatureQueryResult {
        return try await withTaskCancellationHandler {
        } operation: {
            try await withCheckedThrowingContinuation { continuation in
                ...
                let _ = myPublisher.sink { completion in
                    ...
                    continuation.resume(with: myResult)
                } receiveValue: { _ in }
            }
        }
    }

Then I end up with a runtime error:

SWIFT TASK CONTINUATION MISUSE: queryFeatures(parameters:) leaked its continuation!

Here is an implementation with cancel support that seems to work. I'm not sure it's thread safe? Does anybody have a comment on the validity of this approach?


enum AsyncError: Error {
    case valueWasNotEmittedBeforeCompletion
}

class CancellableWrapper {
    var cancellable: AnyCancellable?
}


extension Publisher {
    var first: Output {
        get async throws {
            // Variable tracks if we sent a value or not.
            var didSendValue: Bool = false
            let cancellableWrapper = CancellableWrapper()
            return try await withTaskCancellationHandler {
                cancellableWrapper.cancellable?.cancel()
            } operation: {
                // This check is necessary in case this code runs after the task was
                // cancelled. In which case we want to bail right away.
                try Task.checkCancellation()
                
                return try await withUnsafeThrowingContinuation { continuation in
                    // This check is necessary in case this code runs after the task was
                    // cancelled. In which case we want to bail right away.
                    guard !Task.isCancelled else {
                        continuation.resume(throwing: Task.CancellationError())
                        return
                    }
                    
                    cancellableWrapper.cancellable =
                    handleEvents(receiveCancel: {
                        // We don't get a cancel error when cancelling a publisher, so we need
                        // to handle if the publisher was cancelled from the
                        // `withTaskCancellationHandler` here.
                        continuation.resume(throwing: Task.CancellationError())
                    }).sink { completion in
                        if case let .failure(error) = completion {
                            continuation.resume(throwing: error)
                        } else if !didSendValue {
                            continuation.resume(throwing: AsyncError.valueWasNotEmittedBeforeCompletion)
                        }
                    } receiveValue: { value in
                        continuation.resume(with: .success(value))
                        didSendValue = true
                    }
                }
            }
        }
    }
}
1 Like