Skip to content

Instantly share code, notes, and snippets.

@chronossc
Created January 2, 2013 02:00
Show Gist options
  • Save chronossc/4431578 to your computer and use it in GitHub Desktop.
Save chronossc/4431578 to your computer and use it in GitHub Desktop.
class LockedTask(Task):
# based on http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time
abstract = True
lock_expire = 5 # 5 minutes
def __init__(self, *a, **kw):
super(LockedTask, self).__init__(*a, **kw)
self.logger = self.get_logger()
def lock_key(self, *a, **kw):
if not a:
a = self.request.args
if not kw:
kw = self.request.kwargs
s = simplejson.dumps({'a': a, 'kw': kw})
h = md5(s).hexdigest()
return "%s-lock-%s" % (self.name, h)
def is_locked(self, *a, **kw):
"""
Check if is already locked, if not, lock it
"""
lock_key = self.lock_key(*a, **kw)
if not self.acquire_lock(lock_key, self.lock_expire * 60):
self.logger.warn("Task %s already running with lock %s." % (
self.name, lock_key))
return True
return False
def acquire_lock(self, lock_key, lock_expire):
""" lock or renew lock of task """
self.__lock_key = lock_key
# cache.add fails if the key already exists returning false
cache_add = cache.add(self.__lock_key, "true", self.lock_expire * 60)
if not cache_add:
# if running, renew expire
self.logger.debug("Renew lock for %s" % self.name)
cache.set(self.__lock_key, "true", self.lock_expire * 60)
else:
self.logger.info("Lock created for %s." % self.name)
return cache_add
def release_lock(self):
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
cache.delete(self.__lock_key)
self.logger.info("Released lock for %s with key %s" % (
self.name, self.__lock_key))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment