Running many operations, concurrently, but in batches

So here's some more example code for a question that I've been asked repeatedly: How can I run a large number of operations whilst limiting this to N running at the same time. You may want to do that if you have to do many HTTP requests to a website but you don't want to overload the website and say do only 10 at the same time.

Essentially, this is an extension of the "Loops" with futures question I covered recently. "Loops" with futures is really the same question except that it's running only 1 operation at the same time, ie. they're run sequentially.

This time, we're running maximumConcurrentOperations at the same time.

The basic idea is: Let's take all operations, split them in batches of size maximumConcurrentOperations (hence the nested Array(Slice)). Then we "loop" over the batches running each of the operations in a batch concurrently.

func runConcurrently(maximumConcurrentOperations: Int, eventLoop: EventLoop, operations: [() -> EventLoopFuture<Void>]) -> EventLoopFuture<Void> {
    // Step 1: Let's split our operation into the batches that we run concurrently.
    let operationsSplit = stride(from: operations.startIndex, to: operations.endIndex, by: maximumConcurrentOperations).map { first in
        return operations[first ..< min(operations.endIndex, first + maximumConcurrentOperations)]
    }

    let allDonePromise = eventLoop.makePromise(of: Void.self)

    // Step 2: Let's "loop" over the batches and kick the operations off, concurrently.
    func doIt(batchesLeft: ArraySlice<ArraySlice<() -> EventLoopFuture<Void>>>) {
        var batchesLeft = batchesLeft
        guard let nextBatch = batchesLeft.popFirst() else {
            allDonePromise.succeed(())
            return
        }

        EventLoopFuture<Void>.andAllSucceed(nextBatch.map { operation in operation() },
                                            on: eventLoop).map { [batchesLeft] in
                                                doIt(batchesLeft: batchesLeft)
                                            }.cascadeFailure(to: allDonePromise)
    }

    // this just kicks it off
    doIt(batchesLeft: operationsSplit[...])
    return allDonePromise.futureResult
}

If you wander why we use recursion to "loop" over the batches check the "Loops" with futures post.

A fun way to try this out is the following code:


// Make 30 dummy operations that just wait a second.
let operations = (0..<30).map { i in
    return {
        return group.next().scheduleTask(in: .seconds(1)) {
            print(i)
        }.futureResult
    }
}

// Every 1.1 seconds, print a newline so we see them split nicely.
group.next().scheduleRepeatedTask(initialDelay: .seconds(0), delay: .milliseconds(1010)) { _ in
    print()
}

// Kick them off in batches of 8
try runConcurrently(maximumConcurrentOperations: 8,
                    eventLoop: group.next(),
                    operations: operations).wait()

Example output looks like this:

0
1
4
2
6
7
3
5

8
9
15
11
14
10
12
13

18
16
17
19

You can nicely see the three batches and also how the operations complete out of order because they're running concurrently.

4 Likes