Posted by Robert Kaufman

May 23, 2016 1:06:13 PM

At Salsify, we use Delayed Job long_queue.jpgextensively to handle asynchronous tasks. This works well for us, as it means we can finish web requests faster, resulting in a more responsive web app, while offloading non-urgent tasks to background jobs. For the most part, Delayed Job (and similar job queuing mechanisms like Resque, Celery, etc.) provide a simple and highly effective approach for running background work, making sure that it gets done, and providing a framework to scale your compute resources to handle expected workloads. Even beyond that, these frameworks create straightforward opportunities for dynamically scaling resources to handle spikes in workload. For example, we use an excellent service called HireFire to dynamically scale our Delayed Job worker pools based on queued work. Meaning, we can meet the needs of changing workload while keeping our hosting costs reasonable.

But despite all of the advantages of running background jobs, under real world usage you can still run into challenging situations that require thoughtful handling. One general class of problems that can arise is achieving fairness in resource usage across users. 
Most of the time we expect users to be putting roughly even workload into the system. And through some combination of solid capacity planning and dynamic scaling, we strive to have the resources available to handle all of the workload in a timely fashion. But what if one user submits a huge amount of work — work that very reasonably may end up represented as a large number of delayed jobs? In applications like Salsify, which is a rich business-facing application that offers a wide range of data processing and data integration capabilities, spikes in workload are frequent.

But under these circumstances, resource usage can become challenging despite our best resource management plans. For one thing, not all resources are easily scaled dynamically. For example, in most environments data services like databases and search indexes (such as Elasticsearch) cannot be quickly scaled up or down, which means they can inherently become a bottleneck. Furthermore, since Delayed Job runs jobs on a first-come-first-serve basis, if a large set of jobs from one user are queued up, it can force subsequent jobs from other users to wait in line. And that’s not fair. Ideally, we’d like all users to get quick service even when the system is processing long-running workloads. We want fairness.
Users B and C need to wait for all of User A's jobs to finish.
The First Solution
Delayed Job provides a mechanism to change the order in which jobs are run: a job can have an integer "priority", which provides perhaps the most obvious mechanism for beginning to introduce more fairness into the system. When a worker dequeues a job, it will pick the one with the lowest number in its priority column. A naive solution is to set the priority based on the submitting user’s recent resource consumption; for example, when enqueueing a job, set the priority equal to the number of jobs that this user has enqueued within the last hour. In this approach, if a user enqueues 100 jobs, the last one will have a priority of 100, and while these are running, any user enqueuing only a few jobs will have theirs run first, as their jobs' priorities will be under 100.

This algorithm lowers the priority of large job sets enqueued at one time by an individual user allowing jobs from other users to be picked up first. However, in some cases this simplistic approach goes too far. For users who enqueue many jobs at once, the last of those jobs would have a very low priority, and could be delayed indefinitely by every other user who enqueued new jobs (assumming new jobs continue to arrive). For example, suppose the application is having a very high-traffic day, and a large user enqueues 1000 jobs. The last job will have priority 1000. Now imagine there are many other users using the system, all enqueueing new jobs, and the workers are barely able to keep up with them. The workers will not run the large user's jobs until some of these other users sign off and free up resources.

A second problem is that a user's jobs could sometimes be run in a different order than they were enqueued. This will happen if a large batch was still in progress after an hour, and new jobs with higher priorities were then enqueued. This is unlikely to happen in practice, but we can easily imagine it. For example, one of our common jobs is to reindex a user's products in Elasticsearch, and when a user views a product, that data is retrieved from Elasticsearch. Suppose a user enqueues many long-running jobs, and then immediately edits a product. That product's reindex job will have a very low priority. Now suppose that an hour later, the user edits a second product. Because an hour has passed, the job to reindex the second product has a high priority and will run before the earlier-enqueued reindexing job. Still, the user would see current data for the product they just edited and stale data for the first product. Our jobs are designed to withstand being run in out of order, so the end result will be the same, but it could be confusing to the user at the time.
The Second Solution
We wanted a more robust solution, and took some inspiration from Fair Queueing algorithms, which are used in process and network schedulers. While there is some debate over what should be considered fair, the general idea is that each user is entitled to an equal share of a resource (in this case, our worker processes).

We decided that we could use a round-robin algorithm when dequeueing jobs, which meant that every time a worker dequeued a job, they would first see which users turn it was, and run one of their jobs. That way, every user would get a fair share of our worker resources. For example, if there were 5 available workers and 5 users each with jobs enqueued, each user would get one of their jobs run, regardless of the order in which they were enqueued. This is in contrast to the other algorithms, where it would be possible for a single user to use every available worker.

We were happy with this solution, but it is not perfect. Really, the scarce resource to be shared is worker processing time, and we are not necessarily dividing that equally. We are dividing equally the number of jobs started. The difference is because jobs take differing amounts of time to run. Ideally, we would know how long each job would take to run, and allocate worker resources to evenly distribute worker processing time, but we currently don't have a way of predicting how long a job will take. We don't believe this will have a substantial impact on fairness, but it will be something to watch out for.


A naive round-robin algorithm would take a list of all users in our system, and in turn try to run a job for that user. But most users don't have any jobs enqueued most of the time, so that would result in a lot of SQL queries with no results. Instead, our list should only include users who have one or more jobs in the queue. But that list can potentially change every time we add or remove a job from the queue. We would also need to keep track of this list of users outside of our worker processes, which adds another dependency to the worker processes. We realized we could achieve the same fairness effect by picking a random user who has a job in the queue each time we dequeued a job. By doing so, there is less state to keep track of.
Randomized round-robin: Each user's earliest-inserted job has an equal chance of being run next.

Unlike using the built-in priority column, Delayed Job doesn't provide a super simple way to hook in where we want. We want to change the logic that is used by a worker to pick up a new job to run. This is done in SQL, in the Delayed::Worker.reserve method, but this method also runs the query to actually reserve the job. Here's a simplified version of what it looks like in version 4.1.0 of the 
delayed_job_active_record gem:
Now, here's a simplified version of the changes we made.
The heart of this code is in lines 23-32, which says, essentially, "select a random user ID from the list of eligible IDs". The rest of the code handles details like ensuring we only get a job with the highest priority, and that is not locked by another worker, etc.

To keep this to a reasonable size, I've removed a few things, like dealing properly with workers that should only work on certain queues. Also note that while we could simply put the full SQL statement in the code here, we instead have a class for building the query, which is meant to make this easier to understand and modify in the future.

We used to occasionally have users write to us complaining about slow performance for some feature of the product. But since deploying these changes, we no longer get these messages.

Topics: Ruby, Ruby on Rails, Delayed Job, queues, software engineering

comments powered by Disqus