I built a half baked collectLatest algorithm which works great and I run into an issue where I kinda had to mimic flatMapLatest (RxSwift) or map().switchToLatest() (Combine). I didn't want to rely on a 3rd party implementation so I had to build my own one. My implementation is very simplified and makes use of collectLatest as a building block.
extension AsyncSequence
where
Self: Sendable,
Element: AsyncSequence & Sendable,
Element.Element: Sendable
{
public func switchToLatest() -> AsyncStream<Element.Element> {
AsyncStream { continuation in
let task = Task {
await self.collectLatest { sequence in
// Create the iterator from the sequence.
var iterator = sequence.makeAsyncIterator()
// Start consuming all its values while ignoring all errors.
while true {
do {
while let element = try await iterator.next() {
// Check if the parent task was cancelled.
try Task.checkCancellation()
// Yield new element.
continuation.yield(element)
}
// When the inner while loop ends break from the outer one.
break
} catch is CancellationError {
// In case the thrown error was a cancelation, then break free from the loop.
break
} catch {
assertionFailure("\(error)")
}
}
}
// Finish the stream.
continuation.finish()
}
continuation.setOnTermination { termination in
if termination == .cancelled, !task.isCancelled {
task.cancel()
}
}
}
}
}
Again, errors are ignored here.