Update: Vapor Queuing System

Hi all!

Update on the Vapor queueing library (context here). I just released version 0.2.0. It's a fully functioning version with a Redis implementation as well as an example. Right now, the library has the following features:

  1. Safe handling of SIGTERM signals sent by hosting providers to indicate a shutdown, restart, or new deploy.
  2. Different queue priorities. Allows specifying a job to be on the email queue, for example, versus the data-processing queue.
  3. Implements the reliable queue process to help with unexpected failures.
  4. Includes a maxRetryCount feature that will repeat the job until it succeeds up until a specified count.
  5. Uses NIO to utilize all available cores and EventLoops for jobs.

The Code

Job protocol

Here's a simple example of what an EmailJob might look like:

struct EmailJob: Job {
    let to: String
    let from: String
    let message: String
    
    func dequeue(context: JobContext, worker: EventLoopGroup) -> EventLoopFuture<Void> {
        print(message)
        return worker.future()
    }
    
    func error(context: JobContext, error: Error, worker: EventLoopGroup) -> EventLoopFuture<Void> {
        print(error)
        return worker.future()
    }
}

Dispatching

Here's an example Vapor controller utilizing the EmailJob:

final class JobsController: RouteCollection {
    let queue: QueueService
    
    init(queue: QueueService) {
        self.queue = queue
    }
    
    func boot(router: Router) throws {
        router.get("/queue", use: addToQueue)
    }
    
    func addToQueue(req: Request) throws -> Future<HTTPStatus> {
        let job = EmailJob(to: "to@to.com", from: "from@from.com", message: "message")
        return queue.dispatch(job: job, maxRetryCount: 10).transform(to: .ok)
    }
}

To dispatch to a specific queue, you would do this:

extension QueueType {
    static let emails = QueueType(name: "emails")
}

func addToQueue(req: Request) throws -> Future<HTTPStatus> {
    let job = EmailJob(to: "to@to.com", from: "from@from.com", message: "message")
    return queue.dispatch(job: job, maxRetryCount: 10, queue: .emails).transform(to: .ok)
}

Running workers

To run a new jobs worker, simply type:

vapor run jobs

Or, to run a worker for a specific queue:

vapor run jobs --queue emails

Links:

I would love to hear feedback on the API, the code, and any suggestions overall. The code is tentatively production ready and I'm in the process of using my own project as a guinea pig. Huge thanks to Tanner, Cory, and Johannes for their patience while I asked many, many questions :)

What's next:

  • Possibly explore implementing scheduling functionality
  • Full documentation (PR coming soon to /docs)
  • Possibly a Fluent implementation
  • Full 1.0 release in conjunction with Vapor 4 release
7 Likes

Super excited about this package, thanks @jdmcd!

Absolutely, super pumped to have contributed!

hey guys, I know this has been years since your last post but struggling to figure out the best way to create a queue job that requires a Vapor.Request as a parameter. I'm trying to queue a web crawler that requires a Request but can't make the job codable as long as it has a Request as a payload var. ???

You don't want the Request as part of the job as there's no request when the job is run. Why do you need the request as a parameter? If it's to access the client you can use context.application.client

1 Like

it's to access the client - will use context.application.client and let you know if I have issues, thank you Tim!

and working great :+1: