Created
December 22, 2017 06:44
-
-
Save Henrilin28/84e0e14ec57acc15fdda1f71c4973c55 to your computer and use it in GitHub Desktop.
RQ custom worker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rq.worker import Worker, WorkerStatus, StopRequested | |
from rq.utils import make_colorizer | |
from rq.logutils import setup_loghandlers | |
from rq import Queue | |
class CustomWorker(Worker): | |
def __init__(self, queues, name=None, default_result_ttl=None, | |
connection=None, exc_handler=None, exception_handlers=None, | |
default_worker_ttl=None, job_class=None, queue_class=None): | |
queue_class = Queue | |
super(CustomWorker, self).__init__( | |
queues, name=name, default_result_ttl=default_result_ttl, | |
connection=connection, exc_handler=exc_handler, | |
exception_handlers=exception_handlers, | |
default_worker_ttl=default_worker_ttl, job_class=job_class, | |
queue_class=queue_class) | |
def work(self, burst=False, logging_level="INFO"): | |
"""Starts the work loop. | |
Pops and performs all jobs on the current list of queues. When all | |
queues are empty, block and wait for new jobs to arrive on any of the | |
queues, unless `burst` mode is enabled. | |
The return value indicates whether any jobs were processed. | |
""" | |
setup_loghandlers(logging_level) | |
self._install_signal_handlers() | |
did_perform_work = False | |
self.register_birth() | |
self.log.info('pienso worker started.') | |
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) | |
self.set_state(WorkerStatus.STARTED) | |
qnames = self.queue_names() | |
self.log.info('*** Listening on %s...', green(', '.join(qnames))) | |
try: | |
while True: | |
try: | |
self.check_for_suspension(burst) | |
if self.should_run_maintenance_tasks: | |
self.clean_registries() | |
if self._stop_requested: | |
self.log.info('Stopping on request') | |
break | |
# self.log.info("RQ worker {0!r} done, quitting".format(self.state)) | |
if self.state == 'busy': | |
break | |
if self.get_current_job().status != 'queued': | |
break | |
timeout = None if burst else max(1, self.default_worker_ttl - 60) | |
result = self.dequeue_job_and_maintain_ttl(timeout) | |
if result is None: | |
if burst: | |
self.log.info("RQ worker {0!r} done, quitting".format(self.key)) | |
break | |
job, queue = result | |
self.execute_job(job, queue) | |
self.heartbeat() | |
did_perform_work = True | |
except StopRequested: | |
break | |
finally: | |
if not self.is_horse: | |
self.register_death() | |
return did_perform_work |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment