Skip to content

Instantly share code, notes, and snippets.

@lechup
Last active January 27, 2023 13:52
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save lechup/d886e89490b2f6c737d7 to your computer and use it in GitHub Desktop.
Save lechup/d886e89490b2f6c737d7 to your computer and use it in GitHub Desktop.
GeventWorker class to use with rq. All credits to https://github.com/jhorman from https://github.com/nvie/rq/issues/303#issuecomment-45304465 comment.
from __future__ import absolute_import
import signal
import gevent
import gevent.pool
from rq import Worker
from rq.timeouts import BaseDeathPenalty, JobTimeoutException
from rq.worker import StopRequested, green, blue
from rq.exceptions import DequeueTimeout
class GeventDeathPenalty(BaseDeathPenalty):
def setup_death_penalty(self):
exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout)
self.gevent_timeout = gevent.Timeout(self._timeout, exception)
self.gevent_timeout.start()
def cancel_death_penalty(self):
self.gevent_timeout.cancel()
class GeventWorker(Worker):
death_penalty_class = GeventDeathPenalty
def __init__(self, *args, **kwargs):
pool_size = 20
if 'pool_size' in kwargs:
pool_size = kwargs.pop('pool_size')
self.gevent_pool = gevent.pool.Pool(pool_size)
super(GeventWorker, self).__init__(*args, **kwargs)
def get_ident(self):
return id(gevent.getcurrent())
def _install_signal_handlers(self):
def request_force_stop():
self.log.warning('Cold shut down.')
self.gevent_pool.kill()
raise SystemExit()
def request_stop():
if not self._stopped:
gevent.signal(signal.SIGINT, request_force_stop)
gevent.signal(signal.SIGTERM, request_force_stop)
self.log.warning('Warm shut down requested.')
self.log.warning('Stopping after all greenlets are finished. '
'Press Ctrl+C again for a cold shutdown.')
self._stopped = True
self.gevent_pool.join()
gevent.signal(signal.SIGINT, request_stop)
gevent.signal(signal.SIGTERM, request_stop)
def execute_job(self, job):
self.gevent_pool.spawn(self.perform_job, job)
def dequeue_job_and_maintain_ttl(self, timeout):
if self._stopped:
raise StopRequested()
result = None
while True:
if self._stopped:
raise StopRequested()
self.heartbeat()
while self.gevent_pool.full():
gevent.sleep(0.1)
if self._stopped:
raise StopRequested()
try:
result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection)
if result is None and timeout is None:
self.gevent_pool.join()
if result is not None:
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
break
except DequeueTimeout:
pass
self.heartbeat()
return result
@lechup
Copy link
Author

lechup commented Jun 20, 2014

I don't know why it blocks CTRL + C somehow while running from cmd line with rqworker -w apps.main.utils.rqworker.GeventWorker - how to break the loop with CTRL + C?

@jhorman
Copy link

jhorman commented Jun 26, 2014

The problem with rqworker is it doesn't apply the gevent monkey patching. If I were to stick

import gevent.monkey
gevent.monkey.patch_all()

at the top of the rqworker file, it works. I don't use rqworker, I use a custom main method.

I forked your gist and made some changes to properly support shutdown, I think. One thing I changed was I ignore timeout in dequeue_job_and_maintain_ttl b/c the default is to wait 360 seconds, which means Warm shutdown doesn't really work quickly enough. I don't think it really matters much as this function loops forever until it gets a job anyway.

@lechup
Copy link
Author

lechup commented Jul 6, 2014

Thanks for updates and explanations!

@JohnSundarraj
Copy link

@zhangliyong
Copy link

I don't understand why

result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection)
if result is None and timeout is None:
       self.gevent_pool.join()

not the default:

result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)

as @jhorman said, when calling dequeue_any, will it ignore the shudown signal?

@Perlence
Copy link

How do you terminate a job, but not the whole process?

For example, a job spawns greenlets on its own and fatal error occurs in one of those greenlets, making entire job pointless, how can this job be removed from the pool?

@zhangliyong
Copy link

I made some modifications, and pack it in a package, @lechup is it OK? This is the repo: https://github.com/zhangliyong/rq-gevent-worker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment