@ryanashcraft Our team took this solution and got it working for .values
as well:
public final class Eventually<Output, Failure: Error>: Publisher {
public typealias Promise = (Result<Output, Failure>) -> Void
private let current = CurrentValueSubject<Output?, Failure>(nil)
private let output: AnyPublisher<Output, Failure>
private let lock = NSRecursiveLock()
public init(_ attemptToFulfill: @escaping ((@escaping Promise) -> Void)) {
let promise: Promise = { [current, lock] result in
lock.lock()
defer { lock.unlock() }
switch result {
case let .success(value):
current.send(value)
case let .failure(error):
current.send(completion: .failure(error))
}
}
defer {
attemptToFulfill(promise)
}
self.output = current
.compactMap { $0 }
.first()
.eraseToAnyPublisher()
}
// MARK: Publisher
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
lock.lock()
defer { lock.unlock() }
if let value = current.value {
let just = Just(value).setFailureType(to: Failure.self)
just.receive(subscriber: subscriber)
} else {
output.receive(subscriber: subscriber)
}
}
}
We removed completing the CurrentValueSubject
with current.send(completion: .finished)
and added .first()
to the output
publisher, as first()
finishes the publisher after the first element.
My colleague spent the time figuring this out and had this to say:
Calling send(value) and send(completion) on CurrentValueSubject in quick succession appears to be racy with downstream consumers, meaning, some of the time, all they will see is a completed publisher (buffer has no effect, either). Converting that completed publisher to stream, and using it in for each, in the unfortunate case, means the loop is skipped all together. This appears to address the issue.
We're going to begin using this solution and will report back if there are any issues.