Skip to content

Instantly share code, notes, and snippets.

@calind
Forked from spjwebster/rqretryworker.py
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calind/33114d35ed13f8afed92 to your computer and use it in GitHub Desktop.
Save calind/33114d35ed13f8afed92 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os, sys
sys.path.append(os.getcwd())
import logging
import rq
MAX_FAILURES = 3
logger = logging.getLogger(__name__)
queues = None
def retry_handler(job, exc_type, exc_value, traceback):
job.meta.setdefault('failures', 0)
job.meta['failures'] += 1
# Too many failures
if job.meta['failures'] >= MAX_FAILURES:
logger.warn('job %s: failed too many times times - moving to failed queue' % job.id)
job.save()
return True
# Requeue job and stop it from being moved into the failed queue
logger.warn('job %s: failed %d times - retrying' % (job.id, job.meta['failures']))
for queue in queues:
if queue.name == job.origin:
queue.enqueue_job(job, timeout=job.timeout)
return False
# Can't find queue, which should basically never happen as we only work jobs that match the given queue names and
# queues are transient in rq.
logger.warn('job %s: cannot find queue %s - moving to failed queue' % (job.id, job.origin))
return True
with rq.Connection():
queues = map(rq.Queue, sys.argv[1:]) or [rq.Queue()]
worker = rq.Worker(queues)
worker.push_exc_handler(retry_handler)
worker.work()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment