Queueing Effects and knowing when they're all completed?

Any recommended ways to best handle a large collection of effects (think downloading a list of items which in turn triggers the downloads of child arbitrary items), and queueing all of those up and firing an action when they are all completed?

I've tried a counter, but it's too fragile.

Can you provide some example code, maybe with the counter? It sounds like what you're describing can be achieved using standard publisher operators.

Here's a sample I put together. It runs a set of tasks, and the tasks are batched in groups of 4 (think longer running network calls). The counter keeps track of when they are done, and a secondary reducer just listens for all tasks to be completed. This is a Mac console app. Ideally there would be a 'combine' way of knowing when tasks are done versus just keeping a count.

main.swift:

import Foundation
import ComposableArchitecture

let store = Store(
    initialState: AppState(),
    reducer: appReducer.combined(with: finishedReducer),
    environment: AppEnvironment()
)

ViewStore(store).send(.getItems(count: 100))

dispatchMain()

app.swift:

import Foundation
import ComposableArchitecture

/// MARK: App State  ---------------------------------

struct AppState: Equatable {
    var successFullTasks: Int = 0
    var pendingTasks: Int = 0
}

enum AppAction {
    case getItems(count: Int)
    case getChildItems
    case processChildItem
}

struct AppEnvironment { }

/// MARK: App Reducers  ---------------------------------

let appReducer = Reducer<AppState, AppAction, AppEnvironment> { state, action, env in
    switch action {
        
        case let .getItems(count: count):
            
            state.pendingTasks += count
            
            let effects = (0..<count).map { _ in
                Effect<AppAction, Never>(value: .getChildItems)
            }
            return .concatenate(effects)
        
        case .getChildItems:
            
            state.pendingTasks -= 1

            let effects = (1...Int.random(in: 20...100)).map { _ in
                Effect<AppAction, Never>(value: .processChildItem)
            }
            state.pendingTasks += effects.count

            // Batch the work in groups of 4
            let step = 4
            let maxIndex = effects.count

            let paritioned: [Effect<AppAction, Never>] = stride(from: 0, to: maxIndex, by: step).map { index in
                let endIndex = min(index + step, maxIndex)
                return Effect.merge(effects[index..<endIndex])
            }
            
            return .concatenate(paritioned)
        
        
        case .processChildItem:
            print("Processed task \(state.pendingTasks)")
            state.successFullTasks += 1
            state.pendingTasks -= 1
            return .none
    }
    
}

let finishedReducer = Reducer<AppState, AppAction, AppEnvironment> { state, action, env in
    if state.pendingTasks == 0 {
            print("done")
        
            print("Summary")
            print("=======")
            print("successes    = \(state.successFullTasks)")

            exit(EXIT_SUCCESS)
    }
    return .none
}

project file: https://www.dropbox.com/s/lubvcmq861thgh9/group-tasks.zip?dl=1

Not sure if this applies as I haven’t studied your example code in detail but a while ago I wrote a batched database copy with futures. That sounds like the same problem as I needed to wait for everything to complete to progress through stages.

What I ended up doing was call the batch copy that returned a Future<[Results]> recursively until the list of candidates was exhausted.

Sounds like your candidates are URLs and you could recursively process them until you’re out of URLs to process?

Does this sound like an accurate high level description of the behaviour you're trying to achieve?

  1. Initial downloads (getItems) are executed serially.
  2. For each initial download completion, downloading of child items begins in batches of 4, where downloads within a batch are executed in parallel. Only when a batch is complete does downloading of the next batch begin.
  3. An action should be dispatched to signify the completion of all child tasks.

If so, I have a couple of questions:

  1. Is it a requirement that initial downloads are executed serially, or could they be done in parallel instead?
  2. Is the bursty child item download behaviour entirely intentional, or is the true requirement simply to limit max concurrent downloads to 4?

The reason I ask is because it might be possible to delegate some of these requirements to the environment e.g. a download client that uses URLSession could take advantage of httpMaximumConnectionsPerHost . It's also possible that there are Combine operators that can be used to limit concurrency in a way that makes sense for you, although in its current state I'm not sure about that.

Here's a slight rework of your code which demonstrates how you can stay in effects world for the entire duration of the downloads:

struct AppState: Equatable {
  var didComplete = false
}

enum AppAction {
    case getItems(count: Int)
    case didComplete
}

struct AppEnvironment {
  var getItem: () -> Effect<Void, Never> = { Effect(value: ()) }
  var getChildItem: () -> Effect<Void, Never> = { Effect(value: ()) }
}

/// MARK: App Reducers  ---------------------------------

let appReducer = Reducer<AppState, AppAction, AppEnvironment> { state, action, env in
  switch action {
  case let .getItems(count: count):
    return Effect.concatenate((0..<count).map { _ in env.getItem() })
      .flatMap { _ -> Effect<Void, Never> in
        let effects = (1...Int.random(in: 20...100)).map { _ in
          env.getChildItem()
        }
        
        // Batch the work in groups of 4
        let step = 4
        let maxIndex = effects.count
        
        let paritioned: [Effect<Void, Never>] = stride(from: 0, to: maxIndex, by: step).map { index in
          let endIndex = min(index + step, maxIndex)
          return Effect.merge(effects[index..<endIndex])
        }
        
        return .concatenate(paritioned)
    }
    .last()
    .map { .didComplete }
    .eraseToEffect()
    
    
  case .didComplete:
    state.didComplete = true
    return .fireAndForget {
      print("Finished.")
      exit(EXIT_SUCCESS)
    }
  }
}

I'll take a closer look at your code, thanks!

To answer your questions, these are downloads from an unknown set of URLs, and the data is all coming from different locations. This code would run on device, so doing more than 4 or so concurrent connections wouldn't be a good idea, so that's why 1) I batch the requests to only run 4 at a time and 2) why it's serial, to not overwhelm the system.

I see, so it seems like an issue with the current implementation is that it's actually possible to have intialDownloadCount * 4 concurrent downloads since concatenation/serialisation is only done at the initial download level rather than initial + child. This might be more appropriate:

let appReducer = Reducer<AppState, AppAction, AppEnvironment> { state, action, env in
  switch action {
  case let .getItems(count: count):
    return Effect
      .concatenate(
        (0..<count).map { _ in
          env.getItem()
            .flatMap { _ -> Effect<Void, Never> in
              let effects = (1...Int.random(in: 20...100)).map { _ in
                env.getChildItem()
              }
              
              // Batch the work in groups of 4
              let step = 4
              let maxIndex = effects.count
              
              let paritioned: [Effect<Void, Never>] = stride(from: 0, to: maxIndex, by: step).map { index in
                let endIndex = min(index + step, maxIndex)
                return Effect.merge(effects[index..<endIndex])
              }
              
              return .concatenate(paritioned)
          }
          .eraseToEffect()
      })
      .last()
      .map { .didComplete }
      .eraseToEffect()
    
    
  case .didComplete:
    state.didComplete = true
    
    return .fireAndForget {
      print("Finished.")
      exit(EXIT_SUCCESS)
    }
  }
}

Awesome, thanks so much for your help!