Skip to content

Instantly share code, notes, and snippets.

@bergundy
Created February 8, 2012 15:33
Show Gist options
  • Save bergundy/1770500 to your computer and use it in GitHub Desktop.
Save bergundy/1770500 to your computer and use it in GitHub Desktop.
import re
import itertools
import logging
from apscheduler.triggers.cron import CronTrigger
from tornado.ioloop import IOLoop
from datetime import datetime
class CronCallback(object):
"""Schedules the given callback to be called periodically.
The callback is called according to the schedule argument.
`start` must be called after the CronCallback is created.
If schedule is a string it should contain 7 cron fields: ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week').
If schedule is a dict it must contain at least one of the fields above.
>>> cron1 = CronCallback(lambda: logging.error('x'), dict(seconds = 1)) # logs 'x' every second
>>> cron2 = CronCallback(lambda: IOLoop.instance().stop(), '*/5 * * * * * *') # stops ioloop every 5 seconds
>>> cron1.start()
>>> cron2.start()
>>> IOLoop.instance().start()
"""
_split_re = re.compile("\s+")
_sched_seq = ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week')
def __init__(self, callback, schedule, io_loop=None):
if isinstance(schedule, basestring):
splitted = self._split_re.split(schedule)
if len(splitted) < 7:
raise TypeError("'schedule' argument pattern mismatch")
schedule = dict(itertools.izip(self._sched_seq, splitted))
self.callback = callback
self._trigger = CronTrigger(**schedule)
self.io_loop = io_loop or IOLoop.instance()
self._running = False
self._timeout = None
def start(self):
"""Starts the timer."""
self._running = True
self._schedule_next()
def stop(self):
"""Stops the timer."""
self._running = False
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def _run(self):
if not self._running: return
try:
self.callback()
except Exception:
logging.error("Error in cron callback", exc_info=True)
self._schedule_next()
def _schedule_next(self):
if self._running:
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
@property
def _next_timeout(self):
d = datetime.now()
return self._trigger.get_next_fire_time(d) - d
if __name__ == "__main__":
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment