Last active
October 5, 2017 18:23
-
-
Save PritishC/b1b8c4cd920046053708c49274a064d5 to your computer and use it in GitHub Desktop.
Celery Lock Decorator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from functools import wraps | |
from django.core.cache import get_cache # deprecated in newer versions of django; use caches array. | |
def once_only_task(fun): | |
""" | |
Decorator function to apply locking to Celery tasks; using a cache. | |
If a task has already acquired a lock and is executing, then other tasks | |
are made to retry after a set amount of time. | |
NOTE: Tasks on which this is applied must be bound tasks (bind=True). | |
""" | |
@wraps(fun) | |
def outer(self, *args, **kwargs): | |
# I used a separate DB cache for my use-case with the name "celery_lock". | |
# Django's locmem cache is per-process and each celery worker is a process, | |
# which means different cache instances, rendering it useless. | |
cache = get_cache("celery_lock") | |
lock_key = kwargs.pop('lock_key', settings.CELERY_DEFAULT_LOCK_KEY) | |
# Django cache.add returns True if the key was actually added. | |
# Ensure that the cache backend has atomic add if you want really precise locking. | |
acquire_lock = lambda: cache.add(lock_key, True, | |
settings.CELERY_LOCK_EXPIRES) | |
# cache.delete is silent if the key does not exist | |
release_lock = lambda: cache.delete(lock_key) | |
if acquire_lock(): | |
try: | |
return fun(self, *args, **kwargs) | |
finally: | |
release_lock() | |
else: | |
print "%s<%s> couldn't acquire a lock; will be retried" % \ | |
(fun, args)) | |
self.retry(countdown=settings.CELERY_LOCK_EXPIRES) | |
return outer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment