So here's some more example code for a question that I've been asked repeatedly: How can I run a large number of operations whilst limiting this to N
running at the same time. You may want to do that if you have to do many HTTP requests to a website but you don't want to overload the website and say do only 10 at the same time.
Essentially, this is an extension of the "Loops" with futures question I covered recently. "Loops" with futures is really the same question except that it's running only 1 operation at the same time, ie. they're run sequentially.
This time, we're running maximumConcurrentOperations
at the same time.
The basic idea is: Let's take all operations, split them in batches of size maximumConcurrentOperations
(hence the nested Array(Slice)
). Then we "loop" over the batches running each of the operations in a batch concurrently.
func runConcurrently(maximumConcurrentOperations: Int, eventLoop: EventLoop, operations: [() -> EventLoopFuture<Void>]) -> EventLoopFuture<Void> {
// Step 1: Let's split our operation into the batches that we run concurrently.
let operationsSplit = stride(from: operations.startIndex, to: operations.endIndex, by: maximumConcurrentOperations).map { first in
return operations[first ..< min(operations.endIndex, first + maximumConcurrentOperations)]
}
let allDonePromise = eventLoop.makePromise(of: Void.self)
// Step 2: Let's "loop" over the batches and kick the operations off, concurrently.
func doIt(batchesLeft: ArraySlice<ArraySlice<() -> EventLoopFuture<Void>>>) {
var batchesLeft = batchesLeft
guard let nextBatch = batchesLeft.popFirst() else {
allDonePromise.succeed(())
return
}
EventLoopFuture<Void>.andAllSucceed(nextBatch.map { operation in operation() },
on: eventLoop).map { [batchesLeft] in
doIt(batchesLeft: batchesLeft)
}.cascadeFailure(to: allDonePromise)
}
// this just kicks it off
doIt(batchesLeft: operationsSplit[...])
return allDonePromise.futureResult
}
If you wander why we use recursion to "loop" over the batches check the "Loops" with futures post.
A fun way to try this out is the following code:
// Make 30 dummy operations that just wait a second.
let operations = (0..<30).map { i in
return {
return group.next().scheduleTask(in: .seconds(1)) {
print(i)
}.futureResult
}
}
// Every 1.1 seconds, print a newline so we see them split nicely.
group.next().scheduleRepeatedTask(initialDelay: .seconds(0), delay: .milliseconds(1010)) { _ in
print()
}
// Kick them off in batches of 8
try runConcurrently(maximumConcurrentOperations: 8,
eventLoop: group.next(),
operations: operations).wait()
Example output looks like this:
0
1
4
2
6
7
3
5
8
9
15
11
14
10
12
13
18
16
17
19
You can nicely see the three batches and also how the operations complete out of order because they're running concurrently.