Update: Vapor Queuing System


(Jimmy McDermott) #1

Hi all!

Update on the Vapor queueing library (context here). I just released version 0.2.0. It's a fully functioning version with a Redis implementation as well as an example. Right now, the library has the following features:

  1. Safe handling of SIGTERM signals sent by hosting providers to indicate a shutdown, restart, or new deploy.
  2. Different queue priorities. Allows specifying a job to be on the email queue, for example, versus the data-processing queue.
  3. Implements the reliable queue process to help with unexpected failures.
  4. Includes a maxRetryCount feature that will repeat the job until it succeeds up until a specified count.
  5. Uses NIO to utilize all available cores and EventLoops for jobs.

The Code

Job protocol

Here's a simple example of what an EmailJob might look like:

struct EmailJob: Job {
    let to: String
    let from: String
    let message: String
    
    func dequeue(context: JobContext, worker: EventLoopGroup) -> EventLoopFuture<Void> {
        print(message)
        return worker.future()
    }
    
    func error(context: JobContext, error: Error, worker: EventLoopGroup) -> EventLoopFuture<Void> {
        print(error)
        return worker.future()
    }
}

Dispatching

Here's an example Vapor controller utilizing the EmailJob:

final class JobsController: RouteCollection {
    let queue: QueueService
    
    init(queue: QueueService) {
        self.queue = queue
    }
    
    func boot(router: Router) throws {
        router.get("/queue", use: addToQueue)
    }
    
    func addToQueue(req: Request) throws -> Future<HTTPStatus> {
        let job = EmailJob(to: "to@to.com", from: "from@from.com", message: "message")
        return queue.dispatch(job: job, maxRetryCount: 10).transform(to: .ok)
    }
}

To dispatch to a specific queue, you would do this:

extension QueueType {
    static let emails = QueueType(name: "emails")
}

func addToQueue(req: Request) throws -> Future<HTTPStatus> {
    let job = EmailJob(to: "to@to.com", from: "from@from.com", message: "message")
    return queue.dispatch(job: job, maxRetryCount: 10, queue: .emails).transform(to: .ok)
}

Running workers

To run a new jobs worker, simply type:

vapor run jobs

Or, to run a worker for a specific queue:

vapor run jobs --queue emails

Links:

I would love to hear feedback on the API, the code, and any suggestions overall. The code is tentatively production ready and I'm in the process of using my own project as a guinea pig. Huge thanks to Tanner, Cory, and Johannes for their patience while I asked many, many questions :)

What's next:

  • Possibly explore implementing scheduling functionality
  • Full documentation (PR coming soon to /docs)
  • Possibly a Fluent implementation
  • Full 1.0 release in conjunction with Vapor 4 release

(Tanner) #2

Super excited about this package, thanks @mcdappdev!


(Jimmy McDermott) #3

Absolutely, super pumped to have contributed!