Thanks. No, that code doesn't work as far as I can tell. The parameter order has been reversed, the code doesn't await the call to withTaskCancellationHandler. Other than that it basically seems identical to what I've got and what you originally wrote. Unless I'm missing something...
Here is an implementation with cancel support that seems to work. I'm not sure it's thread safe? Does anybody have a comment on the validity of this approach?
enum AsyncError: Error {
case valueWasNotEmittedBeforeCompletion
}
class CancellableWrapper {
var cancellable: AnyCancellable?
}
extension Publisher {
var first: Output {
get async throws {
// Variable tracks if we sent a value or not.
var didSendValue: Bool = false
let cancellableWrapper = CancellableWrapper()
return try await withTaskCancellationHandler {
cancellableWrapper.cancellable?.cancel()
} operation: {
// This check is necessary in case this code runs after the task was
// cancelled. In which case we want to bail right away.
try Task.checkCancellation()
return try await withUnsafeThrowingContinuation { continuation in
// This check is necessary in case this code runs after the task was
// cancelled. In which case we want to bail right away.
guard !Task.isCancelled else {
continuation.resume(throwing: Task.CancellationError())
return
}
cancellableWrapper.cancellable =
handleEvents(receiveCancel: {
// We don't get a cancel error when cancelling a publisher, so we need
// to handle if the publisher was cancelled from the
// `withTaskCancellationHandler` here.
continuation.resume(throwing: Task.CancellationError())
}).sink { completion in
if case let .failure(error) = completion {
continuation.resume(throwing: error)
} else if !didSendValue {
continuation.resume(throwing: AsyncError.valueWasNotEmittedBeforeCompletion)
}
} receiveValue: { value in
continuation.resume(with: .success(value))
didSendValue = true
}
}
}
}
}
}