Pitch: Vapor-Based Job Queuing System
Currently there exists no standardized way to queue up future jobs (either on a recurring basis or one-off jobs) in Vapor. SwiftNIO provides an API around creating a scheduled task, but it doesn't fit nicely into Vapor's Services architecture and requires the user to develop their own method of persisting the job metadata.
This pitch outlines what an officially supported queuing package might look like for the Vapor ecosystem.
Problem
In order to queue a job right now, users have a few main options:
- Use SwiftNIOs scheduled task API. The problem with this is that it requires that the user develop their own persistence and checking model which could lead to cumbersome implementation.
- Use a package like GitHub - BrettRToomey/Jobs: A job system for Swift backends., which doesn't provide any type of persistence layer other than storing the jobs in memory.
- Use a service like https://cron-job.org or Heroku's dyno scheduler to run a task every x number of minutes via an endpoint or command.
None of the above solutions integrate nicely with things like Vapor's services architecture or the ability to run queued jobs from the cli like vapor run jobs
.
Goals/Features
- Failsafe
- Jobs should be persisted to some type of backing store so that a restart or new deploy of the server does not cause job information to be lost.
- Generic
- Similar to Fluent, this package should allow the user the user to specify which type of database they want to use to store the job information.
- Provide a retrying mechanism
- The package should allow the user to specify if the job should be retried upon failure, and if so, how many times. I imagine that any error thrown during the process would trigger a failure unless explicitly caught and handled by the job.
- Transparent handling of horizontal scaling
- This one is up for discussion. When a service horizontally scales their main servers, it could cause problems with jobs being run multiple times. One potential solution could be to track which jobs have been run already in a database.
- Simple and powerful API
- The package should take advantage of Vapor's
Service
architecture to provide a simple way to queue jobs from inside of controllers.
- The package should take advantage of Vapor's
The Job
Protocol
I propose a Job
protocol that looks something like this:
public protocol Job: Codable {
func run(container: Container) throws -> Future<Void>
}
There may be an argument for also having a separate onError
method as well. That might look something like:
public protocol Job: Codable {
func run(container: Container) throws -> Future<Void>
func onError(container: Container, error: Error) throws -> Future<Void>
}
extension Job {
func onError(container: Container, error: Error) throws -> Future<Void> {
return container.future()
}
}
onError
would be called if run
throws. This may be useful for sending an email with an error message, for example. This could also be manually implemented by the user in the run
function by using catchFlatMap
.
An example client implementation could look like this:
struct EmailJob: Job {
let to: String
let subject: String
let message: String
func run(container: Container) throws -> EventLoopFuture<Void> {
let emailService = try container.make(EmailService.self)
return try emailService
.sendEmail(to: to, subject: subject, message: messagae)
.transform(to: ())
}
}
There should also be some type of QueueService
that allows a user to register a job:
let job = EmailJob(to: "myemail@email.com", subject: "Email subject", message: "My message content")
let queueService = try container.make(QueueService.self)
try queueService.add(job: job, configuration: .oneOff(retryOnError: true, retryAttempts: 5))
public enum QueueConfiguration: Codable {
/// A job that is queued for execution immediately
case oneOff(retryOnError: Bool, retryAttempts: Int?)
/// A job that is scheduled to be run once at some point in the future
case scheduled(runOn: Date, retryOnError: Bool, retryAttempts: Int?)
/// A repeating job
case repeating(startOn: Date,
interval: Double,
stopOn: Date?,
stopAfter: Int?
retryOnError: Bool,
retryAttempts: Int?)
// This would also need to implement a custom coder as I don't believe enums with associated values are Codable by default.
}
The package would then encode all of this information into an object like this, and store it in the database/redis instance:
struct JobData: Codable {
let job: Job
let configuration: QueueConfiguration
}
Things to consider/discuss
- Name
- Personally, I prefer something simple like "Jobs" but this community has an affection for packages named along the Vapor brand, so leaving this open for discussion.
- Initial database wrappers
- There should be some discussion around which database wrappers should be officially provided with the initial release of the package, which are planned, and which should be pushed to the community for support. I think that a good starting place would be releasing an official Redis implementation, but there may be some who would want to use PostgreSQL or FoundationDB too.
- A
Job
will need access to, at the very least, aContainer
. How and where does that get injected?
Future iterations
Eventually, it would be fantastic to see something for the Vapor community similar to Laravel's Horizon (Laravel Horizon - Laravel - The PHP Framework For Web Artisans). I think that queuing is one of the last remaining major outstanding ecosystem problems that need to be solved, and I think a Job
protocol as described above is a solid step towards that.