Skip to content

Instantly share code, notes, and snippets.

@djm
Last active September 21, 2017 09:33
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djm/70e96bb968eb0bd2ce6319c6d1e93fa8 to your computer and use it in GitHub Desktop.
Save djm/70e96bb968eb0bd2ce6319c6d1e93fa8 to your computer and use it in GitHub Desktop.
Triggering Opbeat errors from Python RQ Workers

Introduction

To get rq workers talking to opbeat at this current time, you must use a custom rq exception handler and add it to the chain that the worker accepts. This allows us to call opbeat synchronously and avoid the current issues with the opbeat library's async workers.

Both of the examples below keep the original rq exception handler functionality of moving a failed job to the failed queue, but this is optional - if you only want opbeat logging then just don't pass the move_to_failed_queue handler.

Note that we send a lot of extra information about the job itself (see _get_job_details) but this is not a requirement, the stack you get is useful enough without it.

With django-rq (>=0.9.5)

If using django-rq, you'll want your to insert the custom handler by using the RQ_EXCEPTION_HANDLERS setting in your normal Django settings file. e.g

RQ_EXCEPTION_HANDLERS = (
    'python.module.path.to.log_to_opbeat',  # Module string to above handler.
    'rq.handlers.move_to_failed_queue', # Module string to default rq handler.
)

With rq (>=0.8.2)

If using rq directly, you'll want to pass the exception handler functions directly in to the Worker __init__. More like this:

from rq.worker import Worker
from rq.handlers import move_to_failed_queue
from my_program.rq_handlers import log_to_opbeat

w = Worker(queues, exception_handlers=[log_to_opbeat, move_to_failed_queue])
"""
RQ Custom Exception Handlers
"""
import logging
from django.conf import settings
from opbeat import Client as OpbeatClient
from opbeat.contrib.django.models import get_client_config
logger = logging.getLogger(__name__)
def _get_job_details(job):
"""Retrieve set of job information to send to Opbeat.
NB: Unfortunately the opbeat library kills any ordering so
these will come through jumbled, even using an OrderedDict.
Args:
job: the rq.Job object.
Returns:
dict, the title:value data to send.
"""
return dict((
('Job Description', job.description),
('Job Args', job.args),
('Job Kwargs', job.kwargs),
('Job Created at', job.created_at),
('Job Ended at', job.ended_at),
('Job Enqueued at', job.enqueued_at),
('Job Started at', job.started_at),
('Job Function', job.func_name),
('Job Key', job.key),
('Job Meta', job.meta),
('Job Origin', job.origin),
('Job Timeout', job.timeout),
))
def _log_to_opbeat(job, *exc_info):
"""Logs a job failure to Opbeat, with stacktrace & job info.
This function abides by the RQ exception handler spec.
Args:
job: the rq.Job object.
exc_info: the exception info including traceback.
Returns:
None - so the exception chain can continue.
"""
opbeat_config = get_client_config()
opbeat_config['async_mode'] = False
opbeat_client = OpbeatClient(**opbeat_config)
job_details = _get_job_details(job)
opbeat_client.capture_exception(exc_info=exc_info, extra=job_details)
return
def log_to_opbeat(job, *exc_info):
try:
return _log_to_opbeat(job, *exc_info)
except:
# Catch all exceptions, log, and then let rq fallthrough
# to the next exception handler by returning nothing.
logger.exception("Logging to Opbeat failed")
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment