Issues with throttling and potential solutions

Effect.throttle is used to throttle the output of effects that are associated with an identifier, based on a similar sort of concept to cancellable and debounce:

func throttle<S>(
        id: AnyHashable,
        for interval: S.SchedulerTimeType.Stride,
        scheduler: S,
        latest: Bool
      ) -> Effect where S: Scheduler

It's currently internal due to testing issues outlined in:

There are a few concerns I have which weren't discussed in this GitHub issue that I'm interested to hear others' feedback on.

When throttle is called more than once with the same id on Effects whose Outputs do not match, the following line can cause a crash:

let value = latest ? value : (throttleValues[id] as! Output? ?? value) // throttleValues is [AnyHashable: Any]

This test demonstrates how one could write code that would crash the application:

func testThrottleDifferentTypes() {
    struct CancelToken: Hashable {}

    let e1 = Just(1)
        .prepend(2)
        .eraseToEffect()
        .throttle(id: CancelToken(), for: 1, scheduler: DispatchQueue.testScheduler, latest: false)
    
    let e2 = Just("1")
        .eraseToEffect()
        .throttle(id: CancelToken(), for: 1, scheduler: DispatchQueue.testScheduler, latest: false)
    
    e1.sink(receiveValue: { _ in })
    e2.sink(receiveValue: { _ in })
    
    scheduler.advance(by: 1)
}

One potential fix is to use a type other than AnyHashable for id that is generic over the Output type, ensuring that casting will succeed e.g.

Define a new type struct ThrottleId<Key, Output>: Hashable {} and modify throttle signature:

func throttle<S, K>(
    id: ThrottleId<K, Output>,
    for interval: S.SchedulerTimeType.Stride,
    scheduler: S,
    latest: Bool
  ) -> Effect where S: Scheduler

Then you'd get the following usage:

Just(1)
    .eraseToEffect()
    .throttle(id: ThrottleId<CancelToken, Int>(), for: 1, scheduler: DispatchQueue.testScheduler, latest: false) // Does not compile if `Int` is changed to another type.

You could even (debatably) slightly improve the ergonomics:

struct ThrottleId<Key, Output>: Hashable {    
    init(_ : Key.Type, _ : Output.Type = Output.self) {}
}

Resulting in the following usage:

Just(1)
    .eraseToEffect()
    .throttle(id: ThrottleId(CancelToken.self), for: 1, scheduler: DispatchQueue.testScheduler, latest: false) // Does not compile if `Int` is changed to another type.

A similar crash crops up here: let throttleTime = throttleTimes[id] as! S.SchedulerTimeType? if throttle is called more than once with the same id but different schedulers whose SchedulerTimeType do not match. Again, the crash could be prevented moving the scheduler type into the key.

Neither of these solutions prevent the programmer from writing code that will throttle in unexpected ways, but they do at least prevent crashes.

There's also currently no locking around the throttleTimes or throttleValues dictionaries, but that seems to be slightly more straightforward and less open ended.

Hey @klop!

We actually have a branch that fixes the tests (and adds locking) here if you wanna try it out: GitHub - pointfreeco/swift-composable-architecture at throttle-tests

We just haven't gotten around to wrapping it up and merging.

As for the other potential crashes, these are design choices that we made because it seemed strange to reuse the same throttle ID across different output types or schedulers, and it's unclear to me what the behavior should be in those cases. Should such throttling be shared between effect A and effect B? I also wonder if such a throttle ID collision is more likely to be accidental than intentional. Do you have a specific use case in mind? It would help me understand if these crashes should be mitigated.

Hey @stephencelis, thanks for letting me know about the branch, I'll definitely check it out!

I totally agree -- it would be very strange to re-use the same throttle ID across different output types or schedulers, and I can't think of any use cases off the top of my head that aren't contrived. The downside to crashing is that it can be difficult to reproduce, and I could see a case where accidental sharing of a throttle ID passes code review, passes tests, then crashes in prod. On the other hand, I can also see it being argued that the downsides in moving from AnyHashable to a different id type don't outweigh the benefit of not being able to crash.

Tangentially related: I hacked something together to demonstrate how it's possible to leverage existing Combine operators to implement some of the Effect extensions. Maybe this is an idea you've already explored and decided wasn't right, and personally I'm not thrilled about the way it works, but I just thought I'd get it out there anyway!

public struct EffectId<Key, Output, Failure>: Hashable {
    public init(_ : Key.Type, _: Output.Type = Output.self, _: Failure.Type = Failure.self) {}
}

extension Effect {
    func modulate<Key>(id: EffectId<Key, Output, Failure>, flatten: @escaping (AnyPublisher<Effect<Output, Failure>, Failure>) -> Effect) -> Effect {
        return Deferred { () -> AnyPublisher<Output, Failure> in
            if let subject = subjects[id] as! PassthroughSubject<Effect<Output, Failure>, Failure>? {
                subject.send(self)
                return Empty().eraseToAnyPublisher()
            }
            
            let subject = PassthroughSubject<Effect<Output, Failure>, Failure>()
                        
            let flattened = flatten(subject.prepend(self).eraseToAnyPublisher())
                .handleEvents(
                    receiveCancel: {
                        subjects[id] = nil
                })
            
            subjects[id] = subject
            
            return flattened.eraseToAnyPublisher()
        }
        .eraseToEffect()
    }
    
    public func cancellable2<K>(id: EffectId<K, Output, Failure>) -> Effect {
        return modulate(id: id) { publisher in
            publisher
                .switchToLatest()
                .eraseToEffect()
        }
    }
    
    public static func cancel2<K>(id: EffectId<K, Output, Failure>) -> Effect {
        // need to create API for sending values to passthrough subjects based on ID.
        // would then be able to send `Empty` to the subject here to cancel.
    }
    
    public func throttle2<S: Scheduler, K>(id: EffectId<K, Output, Failure>, for interval: S.SchedulerTimeType.Stride, scheduler: S, latest: Bool) -> Effect {
        return modulate(id: id) { publisher in
            publisher
                .throttle(for: interval, scheduler: scheduler, latest: latest)
                .switchToLatest()
                .eraseToEffect()
        }
    }
    
    public func debounce2<S: Scheduler, K>(
      id: EffectId<K, Output, Failure>,
      for dueTime: S.SchedulerTimeType.Stride,
      scheduler: S
    ) -> Effect {
        return modulate(id: id) { publisher in
            publisher
                .debounce(for: dueTime, scheduler: scheduler)
                .switchToLatest()
                .eraseToEffect()
        }
    }
}

When writing some quick tests for the snippet above, I did notice that Combine's version of throttle seems to produce a different output to yours, but I haven't had enough time to explore why that is yet. I do wonder if latest has the inverse meaning in their API, or maybe there's a bug in Combine (or the test scheduler)?

2 Likes

That modulate is very interesting! Played with it a bit this morning and seems like a promising addition to the library and will help us clean up a bunch of that code. The behavior difference you noticed turned out to be a bug. We weren't scheduling immediate work as we should have. Here's the fix, thanks! Fix throttle scheduling · pointfreeco/swift-composable-architecture@3e4a051 · GitHub

We're gonna let modulate marinate a bit and hope to revisit it soon, since it would allow us to unlock a lot of additional, interesting effects!

1 Like

I did a little cleaning up, added an additional operator that limits max concurrency to switch up the flattening strategies a bit, and in the process came up with an absolutely heinous way of making keys that is so just wrong (see testCancellationWithEffectId() :smiley:). I also came to the realisation that these operators make most sense where Effect is constrained to Failure == Never.

@stephencelis what about extending Reducer with enhancements that control the flow of effects? The nice thing about this (if it works, which tbh I'm unsure about), is that you'd no longer have to define identifier types, or even think about identifiers at all. Another advantage with this approach is that you no longer have to think about "meta" operators on Effect e.g. did I want to use .debounce(id:for:scheduler:) or .debounce(for:scheduler:)? Here are a couple of examples of enhancements that aren't at all perfect, but at least compile on my testing branch:

extension Reducer {
  public func switchingToLatestEffect<Value>(
    from path: CasePath<Action, Value>
  ) -> Reducer {
    let uuid = UUID()
    
    return Reducer { state, action, env in
      let effect = self.run(&state, action, env)
      return path.extract(from: action) == nil
        ? effect
        : effect.cancellable(id: uuid)
    }
  }
  
  public func debouncingEffects<S, Value>(
    from path: CasePath<Action, Value>,
    for dueTime: S.SchedulerTimeType.Stride,
    scheduler: KeyPath<Environment, S>
  ) -> Reducer where S: Scheduler {
    let uuid = UUID()
    
    return Reducer { state, action, env in
      let effect = self.run(&state, action, env)
      return path.extract(from: action) == nil
        ? effect
        : effect.debounce(id: uuid, for: dueTime, scheduler: env[keyPath: scheduler])
    }
  }
}

And now for some example usage, adapted from SearchView.swift. Here's the current implementation first:

let searchReducer = Reducer<SearchState, SearchAction, SearchEnvironment> {
  state, action, environment in
  switch action {
  case let .searchQueryChanged(query):
    struct SearchLocationId: Hashable {}

    state.searchQuery = query

    // When the query is cleared we can clear the search results, but we have to make sure to cancel
    // any in-flight search requests too, otherwise we may get data coming in later.
    guard !query.isEmpty else {
      state.locations = []
      state.locationWeather = nil
      return .cancel(id: SearchLocationId())
    }

    return environment.weatherClient
      .searchLocation(query)
      .receive(on: environment.mainQueue)
      .catchToEffect()
      .debounce(id: SearchLocationId(), for: 0.3, scheduler: environment.mainQueue)
      .map(SearchAction.locationsResponse)
    
  case ...
  }
}

And here's how it looks with the enhancements on Reducer:

let searchReducer = Reducer<SearchState, SearchAction, SearchEnvironment> {
  state, action, environment in
  switch action {
  case let .searchQueryChanged(query):
    state.searchQuery = query

    // When the query is cleared we can clear the search results, but we have to make sure to cancel
    // any in-flight search requests too, otherwise we may get data coming in later.
    guard !query.isEmpty else {
      state.locations = []
      state.locationWeather = nil
      return .none
    }

    return environment.weatherClient
      .searchLocation(query)
      .receive(on: environment.mainQueue)
      .catchToEffect()
      .map(SearchAction.locationsResponse)
    
  case ...
  }
}
.switchingToLatestEffect(from: /SearchAction.searchQueryChanged)
.debouncingEffects(from: /SearchAction.searchQueryChanged, for: 0.3, scheduler: \.mainQueue)

Something that stands out to me as "wrong" in my example implementation is that UUIDs are getting plucked out of thin air, but I reckon there are ways to get around that should it be required.

edit: since I posted this I did some testing and came to the conclusion that... it just doesn't work with modulate. I think it might work with the way throttle, debounce, cancellable etc. were originally implemented though.