Here is an implementation with cancel support that seems to work. I'm not sure it's thread safe? Does anybody have a comment on the validity of this approach?
enum AsyncError: Error {
case valueWasNotEmittedBeforeCompletion
}
class CancellableWrapper {
var cancellable: AnyCancellable?
}
extension Publisher {
var first: Output {
get async throws {
// Variable tracks if we sent a value or not.
var didSendValue: Bool = false
let cancellableWrapper = CancellableWrapper()
return try await withTaskCancellationHandler {
cancellableWrapper.cancellable?.cancel()
} operation: {
// This check is necessary in case this code runs after the task was
// cancelled. In which case we want to bail right away.
try Task.checkCancellation()
return try await withUnsafeThrowingContinuation { continuation in
// This check is necessary in case this code runs after the task was
// cancelled. In which case we want to bail right away.
guard !Task.isCancelled else {
continuation.resume(throwing: Task.CancellationError())
return
}
cancellableWrapper.cancellable =
handleEvents(receiveCancel: {
// We don't get a cancel error when cancelling a publisher, so we need
// to handle if the publisher was cancelled from the
// `withTaskCancellationHandler` here.
continuation.resume(throwing: Task.CancellationError())
}).sink { completion in
if case let .failure(error) = completion {
continuation.resume(throwing: error)
} else if !didSendValue {
continuation.resume(throwing: AsyncError.valueWasNotEmittedBeforeCompletion)
}
} receiveValue: { value in
continuation.resume(with: .success(value))
didSendValue = true
}
}
}
}
}
}