Screen_Shot_2014-05-30_at_1.37.33_PM

THE SALSIFY SMARTER ENGINEERING BLOG

Adding Job Groups to Delayed Jobs

Posted by Joel Turkel

Jul 29, 2013 5:47:00 PM

adding-job-groups-to-delayed-jobsRecently I blogged about the Delayed Job plugin mechanism. In this post we'll explore how we can use this plugin mechanism to add job groups to Delayed Job allowing us to:

  • Cancel all jobs in a job group
  • Block and unblock all jobs in a job group
  • Run additional processing after all jobs in a job group complete

At Salsify we've found that job groups provide a great abstraction for our approach to parallelizing bulk imports. The import process is broken up into sequential stages (e.g. import products before importing relationships between these products) where are each stage is a job group. Each stage is then broken up into smaller jobs that can run in parallel. With that context let's dive into job groups...

UPDATE: We've open sourced this job group plugin. Details can be found in https://github.com/salsify/delayed_job_groups_plugin.

Example Job Group Usage

Before we start working on our plugin implementation let's take a quick look at the API we'll be building towards. Here's a simple example that shows creating a job group, adding jobs to the job group, and then later canceling the job group:

 

We can also create a job group in a blocked state which will prevent any jobs from running until the job group is unblocked:

 

Finally we can register a job to run after all jobs in a job group have completed:

 

Now that we have a better sense of what we're building, let's take a look at how to implement it.

Job Group Implementation

We'll start by defining the JobGroup class that's responsible for the overall control of the job group. JobGroup is an ActiveRecord model that has_many Jobs and has attributes for the completion job, the completion job options, a blocked flag, and a queueing complete flag:

 

There are a few things worth noting about the JobGroup implementation:

  • Delayed Job deletes jobs after they're completed and sets a failed_at date if they fail (assuming you've configured Delayed Job to keep failed jobs). Thus we can tell if the job group is complete by checking for the existence of any jobs that do not have a failed_at date.
  • JobGroups are destroyed after they're finished. This is a similar model to Delayed::Job class which obviates the need for a separate process to periodically delete completed jobs.
  • We use pessimistic locking to avoid race conditions with multiple threads updating the job group concurrently e.g. two threads could concurrently detect that all jobs in the job group have completed and the completion job should be run.
  • JobGroup.check_for_completion will be called after each job completes so we optimize it for the common case of the job group not being complete by avoiding locking and loading the job group.

Next up we need to monkey patch Delayed::Job so it knows about a JobGroup and can be marked as blocked:

 

Finally we need a migration to create the delayed_job_groups table and add the new attributes to the delayed_jobs table:

 

Notice this migration uses PostgreSQL partial indexes to only index jobs that are unblocked and haven't failed. You'll need to change this if your database doesn't support partial indexes.

Job Group Plugin Implementation

We're in the homestretch now. We've created the JobGroup class and made the Job class job group aware. The only thing remaining is to hook into the Job lifecycle to make the appropriate calls on the JobGroup:

 

Finally we must register our plugin with Delayed Job:

 

That's it!

Conclusion

In this post we showed how to use the Delayed Job plugin mechanism to group jobs together into a larger unit of work that can be blocked, canceled or run additional processing when completed. Here at Salsify we've found this abstraction very useful. Let us know if you find it useful too. If there's sufficient interest, we'll look into open sourcing our job group plugin as well as other Delayed Job plugins we've written.

Topics: Engineering

comments powered by Disqus