Implement concurrentPerform / Promise Queue in NIO

Hi,

I am new to NIO and would like to implement a concurrent promise queue with limited parallelism (e.g. max 500 jobs running at once, but within that limit as many as possible). I am unsure how to go about this using NIO.

I can see that the ChannelPipeline API might be able to help by somehow using the BackPressureHandler class, but I have no idea how to use a Pipeline whatsoever - uses of NIO that I'm familiar with so far all work at a higher level (mostly with EventLoopFuture).

Alternatively I could just use GCD and literally just use concurrentPerform, like in the example in the NIO reference:

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { try? eventLoopGroup.syncShutdownGracefully() }

func concurrentPerform() -> EventLoopFuture<[Int]> {
    let jobCount = 10000
    let returningEventLoop = eventLoopGroup.next()

    let jobs = (0 ..< jobCount).map { index in
        eventLoopGroup.next().makePromise(of: Int.self)
    }

    DispatchQueue.concurrentPerform(iterations: jobCount) { index in
        // do async work
        jobs[index].completeWith(
            returningEventLoop.makeSucceededFuture(Int.random(in: 0 ..< 100))
            // actually this would be a call to the aws-sdk, which already uses NIO and returns a future
        )
    }

    return EventLoopFuture.whenAllSucceed(jobs.map { $0.futureResult }, on: returningEventLoop)
}

There seems to be two sources of redundancy here though:

  1. The work I'd like to perform concurrently is already using NIO, so there shouldn't be a need to use GCD (to reiterate: I'm using it here to manage concurrency)
  2. I'm making a bunch of promises only to fulfil them with a future, and then at the bottom get the future result out of them again. This is probably just due to the GCD workaround though.

Does anyone have an idea how to remove those sources of redundancy? Are there inbuilt ways of doing this or do I need to somehow make my own "Parallel Future Queue"? Note that in this example GCD will be limiting parallelism to the number of threads/cores, but actually I'd like to control the parallelism (I want to shoot off many HTTP requests at once – but a limited number at a time – and await their responses).

As an aside: how do I tell MultiThreadedEventLoopGroup to use as many threads as there are cores? Or is that undesirable?

This is unlikely to be the level of abstraction at which you want to work. A ChannelPipeline is associated with a single logical stream of I/O such as one TCP connection or one HTTP/2 stream. Thus, unless you are trying to implement this queue for a single connection this is unlikely to be right.

NIO does not provide a system for this today. This is not a terribly difficult thing to build though: you just need a central place to keep track of what work has to be done, how much is currently outstanding, and when it completes. This can be done pretty simply with a small class. Note that this object will need to be thread-safe and so will need to use locks internally to guarantee that it behaves correctly.

System.coreCount tells you how many cores you have.

1 Like

Thanks a lot for your reply here! Is there a way of avoiding locks by using EventLoop.hop or similar? Or is this not the intended use case for that API?

You can, but for this kind of use-case it'll probably hurt more than it helps. The way you do it is to decide that the central place has all its state on a single event loop, and that all work must pass through that event loop. The downside of this is that it forces this event loop to spend a bunch of extra time managing the completion of work, rather than spreading the cost of doing so over multiple workers. That's just not a great outcome.

Ok, thanks for the info!

I put together a (non-synchronised) version. Would greatly appreciate your feedback on this. The main thing it needs is some kind of synchronisation, because the EventLoop we pass may not be running on the main thread. I still have the impression it might be enough to always queue and dequeue jobs from within the ConcurrentWorkQueue's eventLoop (using eventLoop.submit) – managing the queue should be trivial compared to the actual work to be queued. But as I say I am new to this and welcome to all and any ideas!

func concurrentPerform() -> EventLoopFuture<[Lambda.InvocationResponse]> {
    let concurrentQueue = ConcurrentWorkQueue<Lambda.InvocationResponse>(on: eventLoopGroup.next(), maxConcurrentJobs: 100)

    for _ in 0 ..< 1000 {
        concurrentQueue.add { () -> EventLoopFuture<Lambda.InvocationResponse> in
            return lambda.invoke(request)
        }
    }

    return concurrentQueue.resultPromise.futureResult
}

class ConcurrentWorkQueue<JobType> {
    private var currentlyRunningJobs: Int = 0
    private let maxConcurrentJobs: Int

    private var jobs: [() -> EventLoopFuture<JobType>] = []
    private var promises: [EventLoopPromise<JobType>] = []

    private let eventLoop: EventLoop
    var resultPromise: EventLoopPromise<[JobType]>

    init(on eventLoop: EventLoop, maxConcurrentJobs: Int) {
        precondition(maxConcurrentJobs > 0)
        self.maxConcurrentJobs = maxConcurrentJobs

        self.eventLoop = eventLoop
        resultPromise = eventLoop.makePromise(of: [JobType].self)
    }

    func add(_ job: @escaping () -> EventLoopFuture<JobType>) {
        print("Queueing job")
        jobs.append(job)
        self.dequeue()
    }

    private func dequeue() {
        if currentlyRunningJobs >= maxConcurrentJobs {
            return
        }

        guard jobs.isEmpty == false else {
            // This seems weird, because I think we already know they're complete. Is there a better way?
            // I am aware that this fail if there is an error - I'm undecided whether I want the entire queue to fail early (below) or return a Promise of `[Result<JobType, Error>]` here
            EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: self.eventLoop).cascade(to: resultPromise)
            return
        }

        let newJob = jobs.removeFirst()

        // This is some shonky synchonisation that assumes we'll end up with the same order as what we provided (assuming the order of results should be the same as the order of jobs added)
        // In general, it feels weird to maintain a list of both Promises and Futures. Coming from JS, I don't understand the design decision to provide both?
        let newJobPromise = self.eventLoop.makePromise(of: JobType.self)
        promises.insert(newJobPromise, at: 0)

        self.currentlyRunningJobs += 1

        eventLoop.flatSubmit {
            print("Running job")
            return newJob()
        }.always { result in
            self.currentlyRunningJobs -= 1
            self.dequeue()
        }
        .cascade(to: newJobPromise)
    }
}

You're currently using EventLoopFuture.whenAllSucceed and an array of promises. That's basically unnecessary. Given that you want an array of results, you would probably have an easier time allocating that array ahead of time and then giving each job an index into that array into which they will store their result. That avoids the need to allocate promises ahead of time and then map them into their results

One note: you'll need this to be an array of Optional<JobType> and initialised to have nil at every element first.

This implementation currently doesn't go "wide": it runs all work on a specific event loop. Instead it seems like you probably want an event loop group and to use next() to obtain a loop to submit work to?

Promises and Futures go hand-in-hand. A Future is an object that holds the possibility of a result sometime in the future, as well as a callback chain. You cannot satisfy a Future: it is essentially "read only". A Promise is the writable side of a Future: you can satisfy a Promise. Thus, if you have a Promise you are probably responsible for satisfying it (or arranging for it to be satisfied). If you have a Future, that's not your job: you just consume the data.

You're currently using EventLoopFuture.whenAllSucceed and an array of promises. That's basically unnecessary. Given that you want an array of results, you would probably have an easier time allocating that array ahead of time and then giving each job an index into that array into which they will store their result. That avoids the need to allocate promises ahead of time and then map them into their results

Thanks for the tip! It felt weird to be mutating the Array directly but I will give it a try. Presumably I still need to keep a single result promise and fulfil it with that array though? If I understand your last point correctly, this is the intended use case for an NIO Promise?

This implementation currently doesn't go "wide": it runs all work on a specific event loop. Instead it seems like you probably want an event loop group and to use next() to obtain a loop to submit work to?

This is probably due to my lack of understanding on how NIO works. I thought whoever creates the future controls which EventLoop the work is run on? The actual jobs are being created by the AWS SDK, which itself has access to the EventLoopGroup. I assumed it would be creating jobs that run on different EventLoops? You may be wondering then why the work is running in a flatSubmit block – it probably doesn't need to be.

Thanks again

Yes and yes.

Your understanding is right: I assumed the flatSubmit was a mandatory part of your application. If it is not, you can disregard that advice.

1 Like

Thanks again for your help, Cory! I've got an implementation that seems quite reliable now so will try to battle test it next week.

I'm posting it here in case anyone else is interested in trying it out or poking holes in it. It should be thread safe by means of eventLoop.execute and .hop (and I haven't been able to break it in that regard) but I'm very interested to hear anyone's feedback on that.

import NIO

class ConcurrentWorkQueue<JobType> {
    private var currentlyRunningJobs: Int = 0
    private let maxConcurrentJobs: Int

    private var jobs: [() -> EventLoopFuture<JobType>] = []
    private var results: [JobType?] = []
    private var currentJobIndex: Int = 0

    private let eventLoop: EventLoop
    var resultPromise: EventLoopPromise<[JobType]>

    init(on eventLoop: EventLoop, maxConcurrentJobs: Int) {
        precondition(maxConcurrentJobs > 0)
        self.maxConcurrentJobs = maxConcurrentJobs

        self.eventLoop = eventLoop
        resultPromise = eventLoop.makePromise(of: [JobType].self)
    }

    func add(_ job: @escaping () -> EventLoopFuture<JobType>) {
        eventLoop.execute {
            self.jobs.append(job)
            self.results.append(nil)
            self.dequeue()
        }
    }

    private func dequeue() {
        if currentlyRunningJobs >= maxConcurrentJobs {
            return
        }

        if self.currentJobIndex >= self.jobs.count {
            // We may have started running all jobs (which would set currentJobIndex == jobs.count)
            // in which case we should avoid trying to queue any more work (which doesn't exist).

            // We're not necessarily finished running all of them yet though; only succeed when we are:
            if currentlyRunningJobs == 0 {
                resultPromise.succeed(results as! [JobType])
            }
            return
        }

        let currentJobIndex = self.currentJobIndex
        let newJob = jobs[currentJobIndex]

        self.currentJobIndex += 1
        self.currentlyRunningJobs += 1

        _ = newJob().hop(to: self.eventLoop).always { result in
            self.currentlyRunningJobs -= 1

            do {
                print("Completed job", currentJobIndex)
                self.results[currentJobIndex] = try result.get()
                self.dequeue()
            } catch {
                self.resultPromise.fail(error)
            }
        }
    }
}