PrePitch: Introduce `(flat)mapLatest` Algorithms

Im working on a codebase that makes extensive use of RxSwift while gradually introducing Swifts Structured Concurrency i was looking for a equivalent of flatMapLatest

I think a good example is a search-field thats performs a server call to load results.
The desired behaviour would be:

  • When i type something into the field it immediately loads results from the server
  • When i then change the text while the server-call is still in progress
  • The first server call is cancelled (or its result simply ignored) and it issues a new server call using the new input

The could i would like to write for this would


func loadResults(searchTerm: String) async throws -> [SearchResult] {.... }

let querySequence: AsyncSequence<String> = ...
let resultsSequeunce = querySequence.mapLatest { 
    try await loadResults(searchTerm: $0)
}

for results in resultsSequeunce {
  populateUI(results)
}

I think adding two algorithms like this would be a useful addition

// signatures are simplified a bit 

extension AsyncSequence {
  func mapLatest(_ transform: (Element) async -> Transformed) -> AsyncSequence<Transformed> {
    ...
  }

  func flatMapLatest(_ transform: (Element) async -> AsyncSequence<Transformed>) -> AsyncSequence<Transformed> {
    ...
  }
}

What do you think?

3 Likes

Hi

I agree with the need.

You can give a look at an implementation here. I intend to write a proposal at some point to integrate it in this repo (it might need some adjustments code wise though regarding Sendability)

In general, +1 on this. It is very much needed. Right now the focus is on stabilising the current algorithms to a 1.0 release and this should be considered right after IMO.

1 Like

I built a half baked collectLatest algorithm which works great and I run into an issue where I kinda had to mimic flatMapLatest (RxSwift) or map().switchToLatest() (Combine). I didn't want to rely on a 3rd party implementation so I had to build my own one. My implementation is very simplified and makes use of collectLatest as a building block.

extension AsyncSequence
  where
  Self: Sendable,
  Element: AsyncSequence & Sendable,
  Element.Element: Sendable
{
  public func switchToLatest() -> AsyncStream<Element.Element> {
    AsyncStream { continuation in
      let task = Task {
        await self.collectLatest { sequence in
          // Create the iterator from the sequence.
          var iterator = sequence.makeAsyncIterator()

          // Start consuming all its values while ignoring all errors.
          while true {
            do {
              while let element = try await iterator.next() {
                // Check if the parent task was cancelled.
                try Task.checkCancellation()

                // Yield new element.
                continuation.yield(element)
              }
              // When the inner while loop ends break from the outer one.
              break
            } catch is CancellationError {
              // In case the thrown error was a cancelation, then break free from the loop.
              break
            } catch {
              assertionFailure("\(error)")
            }
          }
        }

        // Finish the stream.
        continuation.finish()
      }
      continuation.setOnTermination { termination in
        if termination == .cancelled, !task.isCancelled {
          task.cancel()
        }
      }
    }
  }
}

Again, errors are ignored here.