Skip to content

Instantly share code, notes, and snippets.

@PritishC
Last active October 5, 2017 18:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PritishC/b1b8c4cd920046053708c49274a064d5 to your computer and use it in GitHub Desktop.
Save PritishC/b1b8c4cd920046053708c49274a064d5 to your computer and use it in GitHub Desktop.
Celery Lock Decorator
# -*- coding: utf-8 -*-
from functools import wraps
from django.core.cache import get_cache # deprecated in newer versions of django; use caches array.
def once_only_task(fun):
"""
Decorator function to apply locking to Celery tasks; using a cache.
If a task has already acquired a lock and is executing, then other tasks
are made to retry after a set amount of time.
NOTE: Tasks on which this is applied must be bound tasks (bind=True).
"""
@wraps(fun)
def outer(self, *args, **kwargs):
# I used a separate DB cache for my use-case with the name "celery_lock".
# Django's locmem cache is per-process and each celery worker is a process,
# which means different cache instances, rendering it useless.
cache = get_cache("celery_lock")
lock_key = kwargs.pop('lock_key', settings.CELERY_DEFAULT_LOCK_KEY)
# Django cache.add returns True if the key was actually added.
# Ensure that the cache backend has atomic add if you want really precise locking.
acquire_lock = lambda: cache.add(lock_key, True,
settings.CELERY_LOCK_EXPIRES)
# cache.delete is silent if the key does not exist
release_lock = lambda: cache.delete(lock_key)
if acquire_lock():
try:
return fun(self, *args, **kwargs)
finally:
release_lock()
else:
print "%s<%s> couldn't acquire a lock; will be retried" % \
(fun, args))
self.retry(countdown=settings.CELERY_LOCK_EXPIRES)
return outer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment