Im working on a codebase that makes extensive use of RxSwift while gradually introducing Swifts Structured Concurrency i was looking for a equivalent of flatMapLatest
I think a good example is a search-field thats performs a server call to load results.
The desired behaviour would be:
When i type something into the field it immediately loads results from the server
When i then change the text while the server-call is still in progress
The first server call is cancelled (or its result simply ignored) and it issues a new server call using the new input
The could i would like to write for this would
func loadResults(searchTerm: String) async throws -> [SearchResult] {.... }
let querySequence: AsyncSequence<String> = ...
let resultsSequeunce = querySequence.mapLatest {
try await loadResults(searchTerm: $0)
}
for results in resultsSequeunce {
populateUI(results)
}
I think adding two algorithms like this would be a useful addition
You can give a look at an implementation here. I intend to write a proposal at some point to integrate it in this repo (it might need some adjustments code wise though regarding Sendability)
In general, +1 on this. It is very much needed. Right now the focus is on stabilising the current algorithms to a 1.0 release and this should be considered right after IMO.
I built a half baked collectLatest algorithm which works great and I run into an issue where I kinda had to mimic flatMapLatest (RxSwift) or map().switchToLatest() (Combine). I didn't want to rely on a 3rd party implementation so I had to build my own one. My implementation is very simplified and makes use of collectLatest as a building block.
extension AsyncSequence
where
Self: Sendable,
Element: AsyncSequence & Sendable,
Element.Element: Sendable
{
public func switchToLatest() -> AsyncStream<Element.Element> {
AsyncStream { continuation in
let task = Task {
await self.collectLatest { sequence in
// Create the iterator from the sequence.
var iterator = sequence.makeAsyncIterator()
// Start consuming all its values while ignoring all errors.
while true {
do {
while let element = try await iterator.next() {
// Check if the parent task was cancelled.
try Task.checkCancellation()
// Yield new element.
continuation.yield(element)
}
// When the inner while loop ends break from the outer one.
break
} catch is CancellationError {
// In case the thrown error was a cancelation, then break free from the loop.
break
} catch {
assertionFailure("\(error)")
}
}
}
// Finish the stream.
continuation.finish()
}
continuation.setOnTermination { termination in
if termination == .cancelled, !task.isCancelled {
task.cancel()
}
}
}
}
}