Pitch: Queuing System

Pitch: Vapor-Based Job Queuing System

Currently there exists no standardized way to queue up future jobs (either on a recurring basis or one-off jobs) in Vapor. SwiftNIO provides an API around creating a scheduled task, but it doesn't fit nicely into Vapor's Services architecture and requires the user to develop their own method of persisting the job metadata.

This pitch outlines what an officially supported queuing package might look like for the Vapor ecosystem.

Problem

In order to queue a job right now, users have a few main options:

  1. Use SwiftNIOs scheduled task API. The problem with this is that it requires that the user develop their own persistence and checking model which could lead to cumbersome implementation.
  2. Use a package like GitHub - BrettRToomey/Jobs: A job system for Swift backends., which doesn't provide any type of persistence layer other than storing the jobs in memory.
  3. Use a service like https://cron-job.org or Heroku's dyno scheduler to run a task every x number of minutes via an endpoint or command.

None of the above solutions integrate nicely with things like Vapor's services architecture or the ability to run queued jobs from the cli like vapor run jobs.

Goals/Features

  1. Failsafe
    • Jobs should be persisted to some type of backing store so that a restart or new deploy of the server does not cause job information to be lost.
  2. Generic
    • Similar to Fluent, this package should allow the user the user to specify which type of database they want to use to store the job information.
  3. Provide a retrying mechanism
    • The package should allow the user to specify if the job should be retried upon failure, and if so, how many times. I imagine that any error thrown during the process would trigger a failure unless explicitly caught and handled by the job.
  4. Transparent handling of horizontal scaling
    • This one is up for discussion. When a service horizontally scales their main servers, it could cause problems with jobs being run multiple times. One potential solution could be to track which jobs have been run already in a database.
  5. Simple and powerful API
    • The package should take advantage of Vapor's Service architecture to provide a simple way to queue jobs from inside of controllers.

The Job Protocol

I propose a Job protocol that looks something like this:

public protocol Job: Codable {
    func run(container: Container) throws -> Future<Void>
}

There may be an argument for also having a separate onError method as well. That might look something like:

public protocol Job: Codable {
    func run(container: Container) throws -> Future<Void>
    func onError(container: Container, error: Error) throws -> Future<Void>
}

extension Job {
    func onError(container: Container, error: Error) throws -> Future<Void> {
        return container.future()
    }
}

onError would be called if run throws. This may be useful for sending an email with an error message, for example. This could also be manually implemented by the user in the run function by using catchFlatMap.

An example client implementation could look like this:

struct EmailJob: Job {
    let to: String
    let subject: String
    let message: String
    
    func run(container: Container) throws -> EventLoopFuture<Void> {
        let emailService = try container.make(EmailService.self)
        return try emailService
            .sendEmail(to: to, subject: subject, message: messagae)
            .transform(to: ())
    }
}

There should also be some type of QueueService that allows a user to register a job:

let job = EmailJob(to: "myemail@email.com", subject: "Email subject", message: "My message content")
let queueService = try container.make(QueueService.self)
try queueService.add(job: job, configuration: .oneOff(retryOnError: true, retryAttempts: 5))
public enum QueueConfiguration: Codable {
    /// A job that is queued for execution immediately 
    case oneOff(retryOnError: Bool, retryAttempts: Int?)

    /// A job that is scheduled to be run once at some point in the future 
    case scheduled(runOn: Date, retryOnError: Bool, retryAttempts: Int?)
    
    /// A repeating job 
    case repeating(startOn: Date, 
                   interval: Double, 
                   stopOn: Date?,
                   stopAfter: Int?
                   retryOnError: Bool, 
                   retryAttempts: Int?)

    // This would also need to implement a custom coder as I don't believe enums with associated values are Codable by default. 
}

The package would then encode all of this information into an object like this, and store it in the database/redis instance:

struct JobData: Codable {
    let job: Job
    let configuration: QueueConfiguration 
}

Things to consider/discuss

  1. Name
    • Personally, I prefer something simple like "Jobs" but this community has an affection for packages named along the Vapor brand, so leaving this open for discussion.
  2. Initial database wrappers
    • There should be some discussion around which database wrappers should be officially provided with the initial release of the package, which are planned, and which should be pushed to the community for support. I think that a good starting place would be releasing an official Redis implementation, but there may be some who would want to use PostgreSQL or FoundationDB too.
  3. A Job will need access to, at the very least, a Container. How and where does that get injected?

Future iterations

Eventually, it would be fantastic to see something for the Vapor community similar to Laravel's Horizon (Laravel Horizon - Laravel - The PHP Framework For Web Artisans). I think that queuing is one of the last remaining major outstanding ecosystem problems that need to be solved, and I think a Job protocol as described above is a solid step towards that.

6 Likes

I’m not sure if this needs to be Vapor specific. Could it just be based on SwiftNIO and in the future submitted to the SSWG? Or perhaps a core library not vapor specific with a wrapper for vapor.

I do agree that a library such as this is very important. (Also see Ruby’s sidekiq for inspiration, that creator also has some docs discussing his design decisions)

Yeah, I thought about pitching it to SSWG. The problem with that is that I don’t really think it’s generic enough at that level. The overall swift community already has access to NIO’s helpers, and at the end of the day this would really just be persistence on top of that. Since persistence packages haven’t been standardized yet, I’m not sure how that would work. Also I think it’s important to have access to a Container in the Jobs protocol and that’s a Vapor type. Hope that clarifies the motivation, happy to revise if we can solve those problems.

Could we perhaps use writing to a file as a base implementation, and provide hooks to use a database instead?

That would match the behavior of the Logger protocol, which just does stdout without a more intense implementation.

That would get it agnostic enough to try and propose to the SSWG

Edit: I see the strong use case of using a database - but right now some of the jobs I have - I don't care if they're lost when the app starts/stops repeatedly.

If I have to hook up a database, that would make a hard edge to just getting started with it.

Yeah, I could see something like that. Still worried about the ability to inject a Container though.

I think it would be nice to let the developer choose how many job runners they would like. For example, if you have your app scaled to 10 instances behind a load balancer, you might want to use 2 instances for running jobs. 12 instances total. For the 10 app instances, you would use vapor run serve to boot. And for the 2 jobs instances, you would use vapor run jobs to boot. This would mean the jobs package would need to have support for ensuring jobs are only ever dequeued by a single job runner, so that a single job doesn't run twice.

Given service caching is going away in Vapor 4, I think it might be better to pass something like a userInfo: [String: Any] object here. Something like a JobContext that convenience accessors can be added to would be nice. That way jobs don't need to initialize new services each time they are run.

With this method, the email job would look something like:

struct EmailJob: Job {
    let to: String
    let subject: String
    let message: String
    
    func run(_ ctx: JobCtx) throws -> EventLoopFuture<Void> {
        return try ctx.email
            .sendEmail(to: to, subject: subject, message: message)
            .transform(to: ())
    }
}

// in configure.swift

services.register(JobCtx.self) { c in 
    var ctx = JobCtx()
    ctx.email = try c.make(EmailService.self)
    return ctx
}

extension JobCtx {
    var email: EmailService {
         // access from userInfo dictionary, and cast
         get { ... }
         set { ... } 
    }
}

Whenever a job is run, the jobs package would just need to ask the container to create a JobCtx before running. This could be stored per event loop so that things like connection pools get properly re-used.

I like the overall structure, but I would have this be a struct with static methods that emulate an enum. That way it will be possible to add new cases in the future without being a breaking change. I don't think a consumer of this package would ever need to switch over this enum anyway.

Big fan of switching from Container to user info.

It could be possible to break out some components from this that are not framework specific, but I'm not sure there would be a big enough advantage to that. To make something like a queueing system easy to use, there needs to be deep integration into the framework with things like commands, database, controllers, etc. It would probably be easier to just dupe the small amount of generic bits for another framework's implementation. I could be wrong though. Either way, it will al ways be possible to pull bits out into a generic package in the future.

For horizontally scaled applications, this would require having a shared drive. I'm not sure how common that is. As far as I'm aware, most people use Redis or their SQL db for things like this.

Sure, but that's where it's called out to add an integration with a database.

My use case does not have a horizontally scaled application

Redis is definitely my go to. Not opposed to trying to find a way to make it SSWG applicable, but it might not be worth the trouble. Especially since the protocol itself is really simple

Thanks, i think you're both probably right. Like you said tanner if it turns out it would be useful we could pull it out into a more generic package later.

It probably isn't necessary for an initial release, but it'd be nice if the library supported multiple weighted queues at some point.

1 Like

Minor observation: This feels quite a bit like the Operation API inside Foundation.

If you're going down that route, I'd highly recommend scoping out what Operation's capabilities are.

1 Like

Good to know, I’ve not encountered that API. When you say to scope it out do you mean as a potential synergistic solution or for inspiration?

Side note - great to see you on this topic. I refer to your date fallacy and better MVC stuff pretty frequently :)

1 Like

By "scope it out" I mean to take a look at its API and draw inspiration from how it works and what it does.

At a high-level, Foundation.Operation is the encapsulation of a piece of work to be executed. It gets executed on an OperationQueue; you can kind of think about it as an object-oriented wrapper around a closure and DispatchQueue.

In addition to the notion of "execute work on a queue", Operation also includes the idea of "readiness", which is the notion that certain conditions must be met before the work can begin. The main form of "readiness" is the idea of a dependency: if operationB "depends" operationA, then operationB will not execute until after operationA has finished executing, regardless of which OperationQueue they're respectively enqueued on.

If you've enjoyed my stuff on calendars and MVC, might I suggest a WWDC video I did on Operations? It goes over the fundamentals of how they work, as well as a number of expansions: WWDC 2015: Advanced NSOperations

I can point you to a number of open source projects that expand on Operation. Additionally, this is an area in which @Philippe_Hausler knows quite a bit, as he's the current maintainer of the Operation infrastructure in Foundation.framework. There are a lot legacy behaviors in Operation that are less-than-desireable given the current state of the art, and this might be a really fascinating way to rectify them.

I've used ProcedureKit in the years since that presentation and it has evolved on top of the ideas presented, as well as working around the various Operation bugs found in that time. That could be another source of inspiration for dependency API.

Making a jobs library modeled similarly to Foundation.Operation is an interesting idea.

I've seen other job libraries have some idea of a jobs that are dependent on each other, such as Sidekiq's Superworkers, but the way Foundation.Operation models things like this is very nice and has some useful features like being able to pass the result of some operation to a dependent operation (I think ProcedureKit does this as well in a different way) as well as being able to know what operations can be done in parallel based on the dependency graph and doesn't require this being explicitly defined.

One of the issues you can run into with the way Foundation.Operation models dependent operations is that you can create circular-dependent operations that will result if none of them being able to execute. I'm guessing this could fill up a job queue pretty quickly if that error was made.

Thanks for all of the great resources, everyone! I've got a lot of reading and researching to do. I'm going to spend some time looking into all of this and will report back when I have a more concrete implementation plan. I would say the goal is to get this going as soon as possible so I'll probably try and put together some type of proof of concept this week and then expand on it to include some of the cool features mentioned in @davedelong's talk and ProcedureKit.

I've been talking to a friend who uses Laravel (their queueing system is really great) and he was telling me about the difference between queuing and scheduling in their systems. I think that's a really important distinction. So, what I'm thinking instead is removing the ability to run jobs at certain times and possibly also removing repeating jobs. The add(job: ) function would be exactly that - just adding a job to the queue that'll be run when it gets the chance.

Do people have feedback on that idea? That way instead the system would become more of a background processor that could be expanded upon with a separate scheduling system.

1 Like

Thanks to the brilliance of the people who hang out in the Vapor community, I've got an MVP + example up and running. Would love some feedback on what you think, what should come next, API design, etc. A few notes:

  1. This doesn't implement retrying yet
  2. I believe the current lpush Redis system eliminates the need for a database mechanism to track which jobs are done since whichever queue process gets to that item in the queue first will pop it off and make it unavailable for any other horizontal instances.
  3. The goal is to split out the current Redis implementation into a separate package and make the JobsPersistenceLayer protocol public so that other providers can be added.
  4. I want to add a config structure similar to MigrationsConfig or DatabasesConfig instead of doing everything statically, but this works for now.

Package: GitHub - vapor/queues: A queue system for Vapor.
Example: GitHub - jdmcd/jobs-example

Instructions:

  1. Clone the example repo
  2. Run vapor update -y
  3. Make sure you have a redis server running locally
  4. Start the server and hit http://localhost:8080/queue a bunch of times to add some jobs to the queue
  5. Run vapor build in a new terminal window
  6. Run vapor run jobs
2 Likes