Skip to content

Instantly share code, notes, and snippets.

@Henrilin28
Created December 22, 2017 06:44
Show Gist options
  • Save Henrilin28/84e0e14ec57acc15fdda1f71c4973c55 to your computer and use it in GitHub Desktop.
Save Henrilin28/84e0e14ec57acc15fdda1f71c4973c55 to your computer and use it in GitHub Desktop.
RQ custom worker
from rq.worker import Worker, WorkerStatus, StopRequested
from rq.utils import make_colorizer
from rq.logutils import setup_loghandlers
from rq import Queue
class CustomWorker(Worker):
def __init__(self, queues, name=None, default_result_ttl=None,
connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=None, job_class=None, queue_class=None):
queue_class = Queue
super(CustomWorker, self).__init__(
queues, name=name, default_result_ttl=default_result_ttl,
connection=connection, exc_handler=exc_handler,
exception_handlers=exception_handlers,
default_worker_ttl=default_worker_ttl, job_class=job_class,
queue_class=queue_class)
def work(self, burst=False, logging_level="INFO"):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed.
"""
setup_loghandlers(logging_level)
self._install_signal_handlers()
did_perform_work = False
self.register_birth()
self.log.info('pienso worker started.')
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
try:
while True:
try:
self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
self.clean_registries()
if self._stop_requested:
self.log.info('Stopping on request')
break
# self.log.info("RQ worker {0!r} done, quitting".format(self.state))
if self.state == 'busy':
break
if self.get_current_job().status != 'queued':
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if burst:
self.log.info("RQ worker {0!r} done, quitting".format(self.key))
break
job, queue = result
self.execute_job(job, queue)
self.heartbeat()
did_perform_work = True
except StopRequested:
break
finally:
if not self.is_horse:
self.register_death()
return did_perform_work
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment