Skip to content

Instantly share code, notes, and snippets.

@LucasRoesler
Created March 15, 2014 22:08
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LucasRoesler/9574647 to your computer and use it in GitHub Desktop.
Save LucasRoesler/9574647 to your computer and use it in GitHub Desktop.
Demonstration of how to create cron like periodic tasks using python RQ (http://python-rq.org/), django_rq (https://github.com/ui/django-rq), rq_scheduler (https://github.com/ui/rq-scheduler)
# put in app/management/commands
from datetime import datetime
import importlib
import logging
from optparse import make_option
from django.conf import settings
from django.core.management.base import BaseCommand
import django_rq
logger = logging.getLogger(__name__)
def import_function(path):
"""
Given the fully qualified path to the function, return that function.
"""
# path should be of the form module.submodule...submodule.function
# function name is the element after the last period
function_name = path.split('.')[-1]
# the module path is the full path minus the function_name and its leading
# period.
module_path = path[:-len(function_name)-1]
lib = importlib.import_module(module_path)
return lib.__getattribute__(function_name)
class Command(BaseCommand):
"""
Runs RQ scheduler
"""
help = __doc__
option_list = BaseCommand.option_list + (
make_option(
'--verbose',
dest='verbose',
action="store_true",
default=False,
help="If True, it will print the list of scheduled tasks.",
),
make_option(
'--queue',
dest='queue',
default='default',
help='Queue that scheduled tasks will be sent to.'
)
)
def handle(self, task_module=None, *args, **options):
verbose = options.get('verbose')
queue = options.get('queue')
scheduler = django_rq.get_scheduler(queue)
# cancel any currently scheduled tasks that are listed in the
# settings.
func_names = [j['task'] for j in settings.RQ_PERIODIC]
for j in scheduler.get_jobs():
if j.func_name in func_names:
j.cancel()
for task in settings.RQ_PERIODIC:
# grab the function path, remove it from the dictionary
t = task.pop('task')
# add the function object to the dictionary
# and set repeat to None, the task will repeat forever.
task.update({
'func': import_function(t),
'repeat': None
})
# add the task to the schedule
scheduler.schedule(
scheduled_time=datetime.now(),
**task
)
# verbose logging
if verbose:
scheduled = scheduler.get_jobs()
logger.info("Adding {} to scheduled tasks.".format(t))
logger.info("Currently scheduled tasks ({}):\n {}".format(
len(scheduled),
'\n\t'.join([j.func_name for j in scheduled])
))
# add the following to your django settings
# settings describing the tasks that should be run by the rqperiodc management
# command. A list of dicts of the form
# {
# 'task': fully qualified path to the task,
# 'args': list of arguments to pass to the task,
# 'kwargs': a dict of kwargs to pass,
# 'interval': seconds between runs,
# 'timeout': task timeout, use -1 to allow unlimited time,
# 'result_ttl': seconds the result is kept,
# }
RQ_PERIODIC = [
{
'task': 'app.my_hourly_task',
'args': [],
'kwargs': {},
'interval': 3600,
'timeout': -1,
'result_ttl': 86400,
},
{
'task': 'app.my_hourly_task_w_args',
'args': [],
'kwargs': {'name':'Bob'},
'interval': 3600,
'timeout': -1,
'result_ttl': 86400,
},
]
# I put all of my functions what will be run via RQ into a tasks.py folder, one per app.
from datetime import datetime
from django_rq import job
# use negative timeouts for unbound tasks.
@job('default', timeout=-1)
def my_hourly_task():
print "This is a simple test", datetime.now()
@job('default')
def my_hourly_task_w_args(name=None):
if name:
print "{}'s hourly task".format(name)
else:
print "Another hourly task"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment