Skip to content

Instantly share code, notes, and snippets.

@akaihola
Created August 26, 2013 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akaihola/6341351 to your computer and use it in GitHub Desktop.
Save akaihola/6341351 to your computer and use it in GitHub Desktop.
Recurring dynamic Celery task manager
import time
class InvalidOperation(Exception):
pass
class RepeatingCacheManager(object):
"""Recurring dynamic Celery task manager
This class can be used in Celery tasks which should refresh a cache key
repeatedly with a given interval. It uses locking to make sure that the
same cache key isn't already being updated. This way we prevent another
HTTP request from triggering simultaneous tasks.
Redis is used for locking, and a Django cache backend for caching the
actual value.
Usage:
In your Celery task, instantiate the manager with your cache key and the
desired interval in seconds. You may also specify an offset in seconds to
have e.g. a task which occurs 15 seconds after every 5-minute mark, and an
optional cache backend if you don't want to use the default backend::
@task
def mytask(<arguments>):
cacher = RepeatingCacheManager(<cache key>, <interval>, <offset>, <cache>)
Then check the value of the :attr:`need_refresh` attribute, and if it's
``False``, do nothing, since it means there's another task already working
on the cache key.
if not cacher.need_refresh:
return
If :attr:`need_refresh` is ``True``, obtain the value you need to cache and
call the :meth:`save_and_unlock` method to store it in the cache and
release the lock::
value = None
try:
value = <obtain value>
finally:
cacher.save_and_unlock(value)
Finally, if the value was successfully obtained, re-schedule your task at
the next interval using the :attr:`eta` attribute::
mytask.apply_async(args=[<arguments>], eta=cacher.eta)
A complete example::
@task
def mytask(cache_key, link):
cacher = RepeatingCacheManager(cache_key, interval=3600, offset=60)
if not cacher.need_refresh:
return
value = None
try:
value = get_clicks_last_hour(link)
finally:
cacher.save_and_unlock(value)
mytask.apply_async(args=[cache_key, link], eta=cacher.eta)
def statistics_view(request, link):
cache_key = 'clicks-last-hour:{0}'.format(link)
value = cache.get(cache_key)
if not value:
value = '(reload to see number of clicks last hour)'
mytask.delay(cache_key, link)
return render_to_response('clicks.html', {'clicks': value})
"""
def __init__(self,
cache_key, interval=600, offset=0, timeout=None, cache=None):
self.cache_key = cache_key
self.interval = interval
self.offset = offset
self.redis = get_redis()
if cache:
self.cache = cache
else:
# use settings.CACHES['default'] if cache not specified
from django.core.cache import cache as default_cache
self.cache = default_cache
self._eta = None
self.cache_lock_key = '{0}:lock'.format(cache_key)
self.epoch = int(time.time())
# Increase the lock by current epoch.
lock_value = self.redis.incr(self.cache_lock_key, self.epoch)
if lock_value < self.epoch:
raise ValueError(
'RepeatingCacheManager value for {0} in Redis '
'was {1} before incrementing!'
.format(self.cache_lock_key, lock_value - self.epoch))
if lock_value == self.epoch:
# no other task running, we need to refresh cache
self.need_refresh = True
return
elapsed_since_other_task = self.epoch - (lock_value - self.epoch)
if elapsed_since_other_task < (timeout or self.interval // 2):
# another instance of this task with the same arguments was started
# and timeout hasn't yet passed -> cancel
self.redis.decr(self.cache_lock_key, self.epoch)
self.need_refresh = False
return
# Another instance of this task has timed out.
# Reset lock and refresh cache.
self.redis.set(self.cache_lock_key, self.epoch)
self.need_refresh = True
def save_and_unlock(self, result):
# sanity check
if not self.need_refresh:
raise InvalidOperation(
"RepeatingCacheManager can't save value to cache since there "
'is another similar task already running.')
# cache the Marvin data and release the lock
self.cache.set(self.cache_key, result, self.interval)
self.redis.decr(self.cache_lock_key, self.epoch)
# re-schedule this at the next interval plus offset,
# by default 10 seconds after the next 10-minute mark
now = datetime.now()
midnight = datetime(now.year, now.month, now.day)
seconds_since_midnight = (now - midnight).seconds
last_interval = (
self.interval * (seconds_since_midnight // self.interval))
next_interval = timedelta(
seconds=last_interval + self.interval + self.offset)
self._eta = midnight + next_interval
@property
def eta(self):
# sanity check:
if not self._eta:
raise InvalidOperation('RepeatingCacheManager.save_and_unlock() '
'not called, no ETA yet available.')
return self._eta
from contextlib import nested
from datetime import datetime, timedelta
from django.core.cache import cache as default_cache
from django.test import TestCase
from mock import Mock, patch
# see https://gist.github.com/3312399
from cache_switch import cache_switch
import repeating_cache_manager as rcm
def patch_time(return_value):
datetime_mock = Mock(
wraps=datetime,
now=Mock(return_value=(
datetime(2012, 9, 10) + timedelta(seconds=return_value))))
return nested(
patch.object(rcm.time, 'time', Mock(return_value=return_value)),
patch.object(rcm, 'datetime', datetime_mock))
@cache_switch('locmem://')
class RepeatingCacheManager_Tests(TestCase):
# pylint: disable=E1101
# Instance of <class> has no <member>
__test__ = settings.TEST_MODE == 'integration'
def setUp(self):
rcm.get_redis().flushall()
def assertLockValue(self, value):
self.assertEqual(str(value),
rcm.get_redis().get(self.cacher.cache_lock_key))
def test_first_call(self):
"""Cache refresh is needed on first call"""
with patch_time(100):
self.cacher = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher.need_refresh)
self.assertLockValue(100)
self.cacher.save_and_unlock('value')
self.assertEqual('value', default_cache.get('key'))
self.assertLockValue(0)
self.assertEqual(datetime(2012, 9, 10, 0, 1, 50), self.cacher.eta)
def test_second_call(self):
"""Cache refresh is needed on second call if first call completed"""
with patch_time(100):
self.cacher = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher.need_refresh)
self.cacher.save_and_unlock('value')
with patch_time(110):
self.cacher = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher.need_refresh)
self.assertLockValue(110)
self.cacher.save_and_unlock('new value')
self.assertEqual('new value', default_cache.get('key'))
self.assertLockValue(0)
self.assertEqual(datetime(2012, 9, 10, 0, 2), self.cacher.eta)
def test_second_call_when_first_running(self):
"""Cache refresh not needed on second call if first call working"""
with patch_time(100):
self.cacher = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher.need_refresh)
self.assertLockValue(100)
with patch_time(101):
self.cacher2 = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertLockValue(100)
self.assertFalse(self.cacher2.need_refresh)
self.cacher.save_and_unlock('new value')
self.assertLockValue(0)
self.assertEqual(datetime(2012, 9, 10, 0, 1, 50), self.cacher.eta)
def test_second_call_when_first_timeout(self):
"""Cache refresh needed on second call if first call timeout"""
with patch_time(100):
self.cacher = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher.need_refresh)
with patch_time(103):
self.cacher2 = rcm.RepeatingCacheManager('key',
interval=10,
offset=0,
timeout=2)
self.assertTrue(self.cacher2.need_refresh)
self.assertLockValue(103)
self.cacher2.save_and_unlock('new value')
self.assertLockValue(0)
self.assertRaises(rcm.InvalidOperation, lambda: self.cacher.eta)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment