Skip to content

Instantly share code, notes, and snippets.

@lost-theory
Last active August 29, 2015 14:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.
Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.
subclassing rq_scheduler.Scheduler for https://github.com/ui/rq-scheduler/pull/43
from datetime import datetime
import logging
logging.basicConfig(level=logging.DEBUG)
from redis import Redis
from rq_scheduler import Scheduler
from rq_scheduler.utils import to_unix
class CustomRQScheduler(Scheduler):
'''
Same as the core rq_scheduler.Scheduler, but with the enqueue_job method
overridden to enqueue new jobs for each run instead of re-using the same job
ID. This fixes race conditions around overlapping jobs and allows you to manage
expiration of results by yourself.
See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43
This works with rq-scheduler==0.5.0. Be careful about running with a version
other than that.
'''
def enqueue_job(self, job):
"""
Move a scheduled job to a queue. In addition, it also does puts the job
back into the scheduler if needed.
"""
self.log.debug('Pushing {0} to {1}'.format(job.id, job.origin))
interval = job.meta.get('interval', None)
repeat = job.meta.get('repeat', None)
# If job is a repeated job, decrement counter
if repeat:
job.meta['repeat'] = int(repeat) - 1
job.enqueued_at = datetime.utcnow()
job.save()
queue = self.get_queue_for_job(job)
queue.enqueue_call(job.func, job.args, job.kwargs, job.timeout, job.result_ttl)
self.connection.zrem(self.scheduled_jobs_key, job.id)
if interval:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
self.connection._zadd(self.scheduled_jobs_key,
to_unix(datetime.utcnow()) + int(interval),
job.id)
if __name__ == "__main__":
s = CustomRQScheduler(connection=Redis(), interval=5)
s.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment