Instantly share code, notes, and snippets.

@jmhobbs /README.md
Last active Apr 12, 2018

Embed
What would you like to do?
Delayed queues for RQ.

Example Run

RQ Worker

✪ rqworker --db 10 default high
16:06:30 RQ worker started, version 0.3.7
16:06:30 
16:06:30 *** Listening on default, high...
16:06:49 high: jobs.multiply(5, 2) (2df52ba2-bd32-4849-a8e1-c5241c78b542)
16:06:49 Job OK, result = 10
16:06:49 Result is kept for 500 seconds.
16:06:49 
16:06:49 *** Listening on default, high...
16:07:14 default: jobs.add(7, 13) (44bf19a1-7793-4ef9-81e2-f9cece8a966d)
16:07:14 Job OK, result = 20
16:07:14 Result is kept for 500 seconds.
16:07:14 
16:07:14 *** Listening on default, high...
16:08:32 Warm shut down requested.
✪

Enqueue "Daemon"

✪ python enqueue_delayed_jobs.py 
Enqueued 0 Jobs.
Enqueued 0 Jobs.
Enqueued 0 Jobs.
Enqueued 1 Jobs.
Enqueued 0 Jobs.
Enqueued 0 Jobs.
Enqueued 0 Jobs.
Enqueued 0 Jobs.
Enqueued 1 Jobs.
Enqueued 0 Jobs.
Shutting Down
✪

Delay Some Jobs

✪ python delay_jobs.py
Delay jobs.add(7, 13) on default queue for 30 seconds.
Delay jobs.multiply(5, 2) on high queue for 5 seconds.
✪
# -*- coding: utf-8 -*-
try:
import cPickle as pickle
except ImportError:
import pickle
import time
import uuid
from rq import Queue
class DelayedJob(object):
def __init__(self, redis):
self.redis = redis
def _now(self):
'''Get the current time, as an integer UTC timestamp.'''
return int(time.mktime(time.gmtime()))
def delay(self, queue, job, seconds, *args, **kwargs):
'''Delay a queue job by a number of seconds.'''
self.redis.zadd('queue:delayed', pickle.dumps({'job': job, 'queue': queue, 'args': args, 'kwargs': kwargs, 'id': uuid.uuid1().hex}), self._now() + seconds)
def enqueue_delayed_jobs(self, now=None):
'''Enqueue and clear out ready delayed jobs.'''
if not now:
now = self._now()
jobs = self.redis.zrangebyscore('queue:delayed', 0, now)
for pickled_job in jobs:
job = pickle.loads(pickled_job)
Queue(job['queue'], connection=self.redis).enqueue(job['job'], *job['args'], **job['kwargs'])
self.redis.zrem('queue:delayed', pickled_job)
return len(jobs)
def seconds(self, queue, seconds, job, *args, **kwargs):
return self.delay(queue, job, seconds, *args, **kwargs)
def minutes(self, queue, minutes, job, *args, **kwargs):
return self.delay(queue, job, 60 * minutes, *args, **kwargs)
def hours(self, queue, hours, job, *args, **kwargs):
return self.delay(queue, job, 3600 * hours, *args, **kwargs)
def days(self, queue, days, job, *args, **kwargs):
return self.delay(queue, job, 86400 * days, *args, **kwargs)
# -*- coding: utf-8 -*-
from delay import DelayedJob
from redis import Redis
import jobs
redis_connection = Redis()
delayed_jobs = DelayedJob(redis_connection)
print "Delay jobs.add(7, 13) on default queue for 30 seconds."
delayed_jobs.seconds('default', 30, jobs.add, 7, 13)
print "Delay jobs.multiply(5, 2) on high queue for 5 seconds."
delayed_jobs.seconds('high', 5, jobs.multiply, 5, 2)
# -*- coding: utf-8 -*-
from delay import DelayedJob
from redis import Redis
import time
redis_connection = Redis()
delayed_jobs = DelayedJob(redis_connection)
while True:
print 'Enqueued %d Jobs.' % delayed_jobs.enqueue_delayed_jobs()
try:
time.sleep(5)
except KeyboardInterrupt:
print "Shutting Down"
break
# -*- coding: utf-8 -*-
def add(x, y):
return x + y
def multiply(x, y):
return x * y
argparse==1.2.1
distribute==0.6.24
python-dateutil==2.1
pytz==2013b
redis==2.7.2
rq==0.3.7
six==1.3.0
times==0.6.1
wsgiref==0.1.2
@johnwhelchel

This comment has been minimized.

johnwhelchel commented Apr 12, 2018

If anyone tries to use this and comes across a "Error: ERR value is not a valid float" error, it's because the arguments to ZADD are in the wrong order.
self.redis.zadd('queue:delayed', self._now() + seconds, pickle.dumps({'job': job, 'queue': queue, 'args': args, 'kwargs': kwargs, 'id': uuid.uuid1().hex})) should work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment