[Pitch] `collectLatest`

In our application there were the need for collectLatest algorithm. While working it out under a different name I was pointed out by @manmal that the algorithm I was working was similar to Kotlin's collectLatest.

My implementation is based on the building blocks we already have so it's not a a fully fledged standalone AsyncSequence type.

extension AsyncSequence where Element: Sendable {
  public func collectLatest(
    _ consuming: @escaping @Sendable (Element) async -> Void
  ) async {
    // Exit early if the task has already been cancelled.
    guard !Task.isCancelled else {
      return
    }

    var task: Task<Void, Never>? = nil

    // Helper function to cancel the active task and await it to finish.
    func cancelAndReleaseTask() async {
      if let task {
        task.cancel()
        await task.value
      }
      task = nil
    }

    do {
      // Iterate over the async sequence.
      for try await element in self {
        // Cancel an active task before spawning a new one.
        await cancelAndReleaseTask()

        // Check if the parent task was cancelled.
        if Task.isCancelled {
          break
        }

        // Spawn a new task.
        task = Task {
          await consuming(element)
        }
      }
      
    } catch {
        // nothing there
    }

    // Cancel any active task if sequence finished.
    await cancelAndReleaseTask()
  }
}

In this case I do ignore the errors. Propagating errors with AsyncSequence is a bit mind bending to me without precise error typing in Swift.

I'd love to see something like this officially be included into the algorithm package.

2 Likes