How to cancel a publisher when using withTaskCancellationHandler

Here is an implementation with cancel support that seems to work. I'm not sure it's thread safe? Does anybody have a comment on the validity of this approach?


enum AsyncError: Error {
    case valueWasNotEmittedBeforeCompletion
}

class CancellableWrapper {
    var cancellable: AnyCancellable?
}


extension Publisher {
    var first: Output {
        get async throws {
            // Variable tracks if we sent a value or not.
            var didSendValue: Bool = false
            let cancellableWrapper = CancellableWrapper()
            return try await withTaskCancellationHandler {
                cancellableWrapper.cancellable?.cancel()
            } operation: {
                // This check is necessary in case this code runs after the task was
                // cancelled. In which case we want to bail right away.
                try Task.checkCancellation()
                
                return try await withUnsafeThrowingContinuation { continuation in
                    // This check is necessary in case this code runs after the task was
                    // cancelled. In which case we want to bail right away.
                    guard !Task.isCancelled else {
                        continuation.resume(throwing: Task.CancellationError())
                        return
                    }
                    
                    cancellableWrapper.cancellable =
                    handleEvents(receiveCancel: {
                        // We don't get a cancel error when cancelling a publisher, so we need
                        // to handle if the publisher was cancelled from the
                        // `withTaskCancellationHandler` here.
                        continuation.resume(throwing: Task.CancellationError())
                    }).sink { completion in
                        if case let .failure(error) = completion {
                            continuation.resume(throwing: error)
                        } else if !didSendValue {
                            continuation.resume(throwing: AsyncError.valueWasNotEmittedBeforeCompletion)
                        }
                    } receiveValue: { value in
                        continuation.resume(with: .success(value))
                        didSendValue = true
                    }
                }
            }
        }
    }
}
1 Like