In our application there were the need for collectLatest
algorithm. While working it out under a different name I was pointed out by @manmal that the algorithm I was working was similar to Kotlin's collectLatest
.
My implementation is based on the building blocks we already have so it's not a a fully fledged standalone AsyncSequence
type.
extension AsyncSequence where Element: Sendable {
public func collectLatest(
_ consuming: @escaping @Sendable (Element) async -> Void
) async {
// Exit early if the task has already been cancelled.
guard !Task.isCancelled else {
return
}
var task: Task<Void, Never>? = nil
// Helper function to cancel the active task and await it to finish.
func cancelAndReleaseTask() async {
if let task {
task.cancel()
await task.value
}
task = nil
}
do {
// Iterate over the async sequence.
for try await element in self {
// Cancel an active task before spawning a new one.
await cancelAndReleaseTask()
// Check if the parent task was cancelled.
if Task.isCancelled {
break
}
// Spawn a new task.
task = Task {
await consuming(element)
}
}
} catch {
// nothing there
}
// Cancel any active task if sequence finished.
await cancelAndReleaseTask()
}
}
In this case I do ignore the errors. Propagating errors with AsyncSequence
is a bit mind bending to me without precise error typing in Swift.
I'd love to see something like this officially be included into the algorithm package.