Throttling or debouncing async sequences?

func beginUpdatingSearchResults() {
    async {
        let notifications = NotificationCenter
            .default
            .notifications(named: UITextField.textDidChangeNotification, object: searchTermTextField)
        
        for try await note in notifications {
            print("text did change, making api request \(note)")
        }
    }
}

Say this is called in viewDidLoad() so that as long as the searchTermTextField is alive, I get notifications of the text changing. Obviously I do not want to hit the API every time the searchTermTextField changes. In reactive programming, I'd use a debouce or throttle operator.

Does anyone have any suggestions for achieving equivalent behaviour with async sequences?

Thanks!

1 Like

One thought I had was to be able to write something like this:

let notifications = NotificationCenter
    .default
    .notifications(named: UITextField.textDidChangeNotification)
    .debounce(2)

Operators on async iterators, basically. So I hacked up a not-debounce operator that tries to demonstrate how async iterators might be composed. It isn't pretty, but maybe someone smarter than I might be inspired to riff off this:

extension AsyncSequence {
    
    func debounce(_ timeInterval: TimeInterval) -> some AsyncSequence {
        AsyncJoiner(
            before: AsyncDebouce(timeInterval: timeInterval, value: ()),
            after: self
        )
    }
}

struct AsyncJoiner<BeforeSequence, AfterSequence>: AsyncSequence where BeforeSequence: AsyncSequence, AfterSequence: AsyncSequence {
    typealias Element = AfterSequence.AsyncIterator.Element
    let before: BeforeSequence
    let after: AfterSequence
    
    init(before: BeforeSequence, after: AfterSequence) {
        self.before = before
        self.after = after
    }
    
    func makeAsyncIterator() -> AsyncIterator<BeforeSequence.AsyncIterator, AfterSequence.AsyncIterator> {
        AsyncIterator(before: before.makeAsyncIterator(), after: after.makeAsyncIterator())
    }
    
    struct AsyncIterator<BeforeIterator, AfterIterator>: AsyncIteratorProtocol where BeforeIterator: AsyncIteratorProtocol, AfterIterator: AsyncIteratorProtocol {
        typealias Element = AfterIterator.Element
        
        let before: BeforeIterator
        let after: AfterIterator
        
        init(before: BeforeIterator, after: AfterIterator) {
            self.before = before
            self.after = after
        }
        
        mutating func next() async throws -> Element? {
            var before = self.before
            var after = self.after
            let beforeElement = try await before.next()
            let afterElement = try await after.next()
            return afterElement
        }
    }
}

struct AsyncDebouce<AsyncElement>: AsyncSequence {
    typealias Element = AsyncElement
    let timeInterval: TimeInterval
    let value: AsyncElement
    
    func makeAsyncIterator() -> AsyncIterator<AsyncElement> {
        AsyncIterator(timeInterval: timeInterval, value: value)
    }
    
    struct AsyncIterator<AsyncElement>: AsyncIteratorProtocol {
        typealias Element = AsyncElement
        
        let timeInterval: TimeInterval
        let value: AsyncElement
        
        mutating func next() async throws -> AsyncElement? {
            await Task.sleep(.init(timeInterval) * 1000000000)
            return value
        }
    }
}
1 Like

From a general sense making operators as such are perfectly reasonable things to do and AsyncSequence was particularly designed to make it simple to do so (comparatively to other similar APIs in that you don’t need to worry about demand or the thread safety of events because async/await takes care of that). This particular one perhaps does not model debounce as you might hope but it is close. To really get a concept
of debounce you need one concurrency primitive you don’t yet have available, and there probably should be a more refined concept of time than say nanoseconds.

AsyncSequence was designed so that you can safely do more than just composition. That being said I feel like there are definitely areas that the concurrency library can offer some primitives that go beyond just sequence based things; like for example debounce is a pretty common one for folks to use and would be a pretty strong candidate in my book as a proper pitch to add to the transformation “operators” available today.

5 Likes

What is that? Some kind of perform task after delay on the Executor?

The missing primitive is a way of taking the first result of two or more tasks and resume when that first resultant produces a value without cancelling the others. Effectively it needs to be able to determine the winner of a race between tasks.

6 Likes

I tried implementing this primitive with existing concurrency tools and came up with this: A Swift implementation concurrency structure safely racing two tasks against each other. This structure is useful for implementing operators like `debounce` for `AsyncSequence`. · GitHub. It worked fine in preliminary testing, but I’m not sure if I missed something.

I'll try implementing debounce(for:) with this shortly, where I'll test it more thoroughly. Any suggestions on what to test for are welcome!

4 Likes

Interesting, @filip-sakel! Looking forward to the next part. What’d happen if the parent task was cancelled before the race starts? How would one even test that?

1 Like

Thanks for the suggestion!

I think we could check for cancellation at the beginning of raceTasks(_:and:) to avoid unnecessarily starting the "child" tasks. I'm not sure if that's the responsibility of the caller, but it's certainly something we can do.

Also, "racing tasks" should be a structured operation, so cancellation during racing is another consideration, which I didn't take into account. The RacerActor should integrate cancellation checking most likely with withTaskCancellationHandler(operation:onCancel:). I'll look into implementing this as well.

As for testing, I think Task.sleep(_:) could be used, which — albeit not very precise — could indicate if cancellation is seriously impeded.

The reply is late but you could try this package I have been working on GitHub - Henryforce/AsyncTimeSequences: The missing Time Sequence Operators for Swift Concurrency . The key component in it is an AsyncScheduler that is used by different async time sequences like Debounce, Throttle, Delay (also in the package)... It makes use of only Swift Async code. As denoted by other commenters Task execution can lead into races, so the AsyncScheduler internally implements a priority queue to guarantee execution order (based on scheduled and completion time). You can take a look :)

3 Likes

I know this is an old question, but nowadays we would use Apple’s Swift Async Algorithms, which provides a debounce function:

func beginUpdatingSearchResults() async {
    let strings = NotificationCenter
        .default
        .notifications(named: UITextField.textDidChangeNotification, object: searchTermTextField)
        .compactMap { await ($0.object as? UITextField)?.text }
        .debounce(for: .seconds(2))

    for await string in strings {
        …
    }
}
4 Likes

Unfortunately this doesn't behave like a debounce that you might be familiar with from Combine.

First of all, as written, it is debouncing the result of the compactMap above. Right now the compactMap isn't doing anything significant, but if it started doing something that suspended for longer, like an API request:

let strings = NotificationCenter
  .default
  .notifications(named: UITextField.textDidChangeNotification, object: searchTermTextField)
  .map { try await apiRequest.fetch() }
  .debounce(for: .seconds(2))

…then you would be debouncing the responses from the API, not the notifications. This means it is possible to get stale emissions from the sequence.

And if you flip the order of operators so that you debounce the notifications first and then map onto the API request:

let strings = NotificationCenter
  .default
  .notifications(named: UITextField.textDidChangeNotification, object: searchTermTextField)
  .debounce(for: .seconds(2))
  .map { try await apiRequest.fetch() }

…well, unfortunately, that won't work either. That is debouncing the notifications, but it will not cancel an inflight API request if a new notification is posted. So again you can get stale emissions from the sequence.

As far as I can tell, the debounce operator in swift-async-algorithms doesn't play nicely with performing async work, and it doesn't have similar behavior to the debounce operator from Combine.

3 Likes

I’m not sure that debounce is the issue here. Its job is to debounce an asynchronous sequence given some time interval. It doesn’t seem appropriate for it to be doing anything with respect to backpressure.

If anything, I’d be more inclined to ask, academically, why map and compactMap don’t offer back-pressure semantics options. They’re the ones taking async closures, and if any API should be offering back-pressure semantics, these should, not debounce.

But this is all moot. The correct logic is to stop searches as soon as the user starts typing anything, no delay, not after debouncing. The quiescence identified by debounce is only used to determine when to start the next search, but cancelation of prior one should happen immediately when the user types anything new, without delay. Not only is this the right/efficient solution, but it completely eliminates this back-pressure problem.

Now the original question is regarding keyboard debouncing and search logic, so hopefully the above answers that question. In those cases where I can’t avoid it and need to handle backpressure in my AsyncSequence, I don’t expect the asynchronous sequence to handle that, but rather I incorporate that logic in the for-await-in loop (or the method this loop calls). E.g., in your fetch(for:) example, that wrapper might look like:

private var previousTask: Task<[String], Error>?

func search(for string: String) async throws -> [String] {
    let task = Task { [previousTask] in
        previousTask?.cancel()
        return try await apiRequest.fetch(for: string)
    }
    previousTask = task
    return try await task.value
}
1 Like

Also would point this out wrt throttling, it does not behave as one would expect to be useful in most cases: