Skip to content

Instantly share code, notes, and snippets.

@fdemmer
Last active May 15, 2019 09:20
Show Gist options
  • Save fdemmer/7551bff2bab80b56aac5018060aded55 to your computer and use it in GitHub Desktop.
Save fdemmer/7551bff2bab80b56aac5018060aded55 to your computer and use it in GitHub Desktop.
Alternative crontab schedule for Celery
import logging
import pytz
import six
import tzcron
from kombu.utils import cached_property
from pytz import AmbiguousTimeError, NonExistentTimeError
from celery import schedules
from celery.utils.time import is_naive
logger = logging.getLogger(__name__)
class tzcrontab(schedules.BaseSchedule):
def __init__(self, expression, tz=None, *args, **kwargs):
"""
Crontab schedule supporting a timezone.
Expression parsing and generation of event times is using `tzcron`:
https://tzcron.readthedocs.io
:param expression: cron expression (with year), see `tzcron` docs for details
:param tz: timezone as string (eg. 'Europe/Vienna') or `pytz.timezone`
"""
self.expression = expression
# set 'timezone' attribute like on `self.app`, use `self.tz` property to retrieve
if isinstance(tz, six.text_type):
self.timezone = pytz.timezone(tz)
else:
self.timezone = tz
super(tzcrontab, self).__init__(**kwargs)
@cached_property
def tz(self):
return self.timezone or self.app.timezone
def __repr__(self):
template = "<{}: {} @{}>"
return template.format(self.__class__.__name__, self.expression, self.tz)
def __eq__(self, other):
if isinstance(other, tzcrontab):
return all([
other.expression == self.expression,
other.timezone == self.timezone,
super(tzcrontab, self).__eq__(other)
])
return NotImplemented
def now(self):
now = super(tzcrontab, self).now()
assert not is_naive(now), "Please, don't use naive datetimes!"
return now
def remaining_estimate(self, since):
# Make a schedule from the cron expression, starting at given datetime.
event_datetimes = tzcron.Schedule(self.expression, self.tz, since)
logger.debug('Schedule from cron expression: %s', event_datetimes)
try:
# Find the next event.
next_datetime = next(event_datetimes)
except AmbiguousTimeError:
logger.exception(
"Time is ambiguous in the requested timezone! "
"Task will not be scheduled!"
)
return
except NonExistentTimeError:
logger.exception(
"Time does not exist in the requested timezone! "
"Task will not be scheduled!"
)
return
remaining_delta = next_datetime - self.now()
logger.debug(
'remaining_estimate: @%s, next_datetime: %s, remaining_seconds: %s',
self.tz, next_datetime, remaining_delta.total_seconds(),
)
return remaining_delta
def _remaining_seconds(self, since):
remaining_delta = self.remaining_estimate(since)
if remaining_delta is not None:
return remaining_delta.total_seconds()
def _is_due(self, remaining_seconds):
# lower bound at 1 second past do not trigger old events
# upper bound at 1 second future to trigger almost due events
is_due = -1 < remaining_seconds < 1
return is_due
def determine_is_due(self, since):
remaining_seconds = self._remaining_seconds(since)
# when determined time was ambiguous or non-existent
if remaining_seconds is None:
logger.warning('could not determine remaining_seconds')
return False, self.app.conf.beat_max_loop_interval
is_due = self._is_due(remaining_seconds)
return is_due, remaining_seconds
def is_due(self, last_run_at):
"""
Return tuple of `(is_due, remaining_seconds)`.
"""
# Remaining time is negative and close to zero, when a task is due.
# However `last_run_at` can be very far in the past if the
# `PersistentScheduler` was stopped for a while.
# To prevent triggering tasks based on outdated scheduled events, we
# limit the time to 1 minute in the past.
now = self.now()
is_due, remaining_seconds = self.determine_is_due(last_run_at)
if not is_due and remaining_seconds < 0:
# The task was not due, due to it being very far in the past.
# Calculate the next runtime from just a moment ago to find earliest due.
since = now - timedelta(seconds=1)
is_due, remaining_seconds = self.determine_is_due(since)
if remaining_seconds < 1:
since = now + timedelta(seconds=1)
_, remaining_seconds = self.determine_is_due(since)
# is_due == True triggers task, remaining_seconds sets time for next tick
return is_due, remaining_seconds
class pytzcrontab(tzcrontab):
def __init__(self, minute='*', hour='*', day_of_week='*',
day_of_month='*', month_of_year='*', year='*',
tz=None, *args, **kwargs):
"""
Wrapper for `tzcrontab` with more "pythonic" interface, that allows passing
the parts of a cron expression as separate arguments.
:param minute: 0-59 or pattern, default: '*'
:param hour: 0-23 or pattern, default: '*'
:param day_of_week: 1-7 (Monday to Sunday) or pattern, default: '*'
:param day_of_month: 1-31 or pattern, default: '*'
:param month_of_year: 1-12 or pattern, default: '*'
:param year: full year (yyyy) or '*', default: '*'
:param tz: timezone as string (eg. 'Europe/Vienna') or `pytz.timezone`
"""
expression = ' '.join([
str(arg) for arg
in (minute, hour, day_of_month, month_of_year, day_of_week, year)
])
super(pytzcrontab, self).__init__(expression, tz=tz, *args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment