Handling Timeouts in RQ

July 6, 2016
python development

An important part of building scalable applications and systems is to offload non-urgent tasks to worker processes. We want to do this in order to limit the time a user is waiting on requests, and prevent our application servers from getting bogged down with non-urgent tasks.

Recently I was migrating a selection of long running processes (data export operations to be specific) to background processes, with two main goals:

  1. Preventing request timeouts that some of our clients had been seeing
  2. Move this load off of our application servers

Imagine my surprise when, after moving these tasks completely out of process, I found a JobTimeoutException in our logs. Again, we have three core concerns this:

  1. We shouldn’t be seeing timeouts in worker processes
  2. These exceptions are being raised from arbitrary places
  3. Most importantly; our exception handlers were not handling these

#1: We shouldn’t be seeing timeouts in worker processes

When setting up our RQ Queue, we specify a timeout of 2 hours. My naive assumption was that this meant 2 hours with no activity. As it turns out, this is wrong. What it really means is 2 hours max job duration. This was the (truncated) stack trace that pointed us in the right direction (note that I’ve truncated some path names, and the beginning of the stack trace):

...
File ".../venv/local/lib/python2.7/site-packages/sqlalchemy/orm/state.py", line 75, in __init__
    def __init__(self, obj, manager):
File ".../venv/local/lib/python2.7/site-packages/rq/timeouts.py", line 51, in handle_death_penalty
    'value ({0} seconds)'.format(self._timeout))
JobTimeoutException: Job exceeded maximum timeout value (7200 seconds)

#2: Exceptions being raised from arbitrary places

This is where #2 came up. This RQ exception is being raised out of SQLAlchemy, which at first seems very strange, but digging into the rq source (by following the stack trace) gives us some answers.

In rq/timeouts.py we can find the following:

class UnixSignalDeathPenalty(BaseDeathPenalty):

  def handle_death_penalty(self, signum, frame):
    raise JobTimeoutException(
            'Job exceeded maximum timeout '
            'value ({0} seconds)'.format(self._timeout))

  def setup_death_penalty(self):
    """
    Sets up an alarm signal and a signal handler
    that raises a JobTimeoutException after the
    timeout amount (expressed in seconds).
    """
    signal.signal(signal.SIGALRM, self.handle_death_penalty)
    signal.alarm(self._timeout)

  def cancel_death_penalty(self):
    """
    Removes the death penalty alarm and puts
    back the system into default signal handling.
    """
    signal.alarm(0)
    signal.signal(signal.SIGALRM, signal.SIG_DFL)

which inherits its __enter__ and __exit__, so it can be used as a context manager.

It’s then used in rq/worker.py to wrap the actual job execution:

with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
    rv = job.perform()

UnixSignalDeathPenalty causes a SIGALRM to be raised into the processes after a give number of seconds, independent of what is currently be executed. By also binding an rq function to the SIGALRM signal, it allows us to raise an exception from basically anywhere. This explains why the exception appeared to be being raised from inside SQLAlchemy.

So awesome, one problem solved, however we should still be able to let RQ know that we’re still working to prevent the job being killed, thus taking care of problem #1, right? Wrong. This does not appear to be supported in RQ currently, however adding it in shouldn’t be too difficult, maybe I’ll take a shot at it. If I do I will totally update it here. In the meantime, if anyone happens to have a better approach, please let me know.

#3: Exception handlers were not handling these exceptions

Solving problem #3 actually turned out to be not nearly as exciting. Again, there were two main problems.

First, I had been using job.meta to pass information between handlers, however was using rq.get_current_job() to get the job to set the meta on, and then attempting to access the meta off of the job object passed into the handler. Turns out, these are not the same object ¯\_(ツ)_/¯. But don’t worry, quick fix: 1) call job.save() after setting the meta, and job.refresh() in the exception handler before accessing it.

Second, I had made the naive mistake of wrapping my entire job handler in a try/except with an overly broad except Exception:. This was swallowing all of my errors, which prevented the registered exc_handlers from being used. Protip: except Exception: is almost always a bad idea.

How to properly handle these

Now that we aren’t swallowing our errors, we are appropriately getting into our error handlers, and can add one to handle our JobTimeoutException appropriately. Ie;

def timeout_handler(job, exc_type, exc_value, traceback):
  if isinstance(exc_value, JobTimeoutException):
    print "We caught a timeout!"
    print "<your timeout behaviour here>"
    # return True to stop chaining exc handlers
    return True
  return False

with rq.Connections(redis_conn):
  worker = rq.Worker([queue])
  worker.push_exc_handler(timeout_handler)

  worker.work()

By checking the exc_value type, we can clearly and easily add a handler which will only deal with these timeout exceptions.

Summary

This post covered alot, but the main points can be summarized as follows:

  • RQ timeouts are the max job duration, not the max time with no activity
  • RQ timeouts use SIGALRM’s, and as such can be raised from anywhere
  • Using except Exception blocks is almost always a bad idea
  • We can easily add an RQ exception handler specifically for JobTimeoutExceptions

Hopefully this works out for you, or was at least helpful. If you have any questions, don’t hesitate to shoot me an email, or follow me on twitter @nrmitchi.