Celery Task with lock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Task | |
from django.conf import settings | |
from django.core.cache import caches | |
from celery.utils.log import get_task_logger | |
logger = get_task_logger(__name__) | |
# noinspection PyAbstractClass | |
class TaskWithLock(Task): | |
""" | |
Base task with lock to prevent multiple execution of tasks with ETA. | |
It's happens with multiple workers for tasks with any delay (countdown, ETA). | |
You may override cache backend by setting `CELERY_TASK_LOCK_CACHE` in your Django settings file | |
""" | |
abstract = True | |
cache = caches[getattr(settings, 'CELERY_TASK_LOCK_CACHE', 'default')] | |
lock_expiration = 60 * 60 * 24 # 1 day | |
@property | |
def lock_key(self): | |
""" | |
Unique string for task as lock key | |
""" | |
return 'TaskLock_%s_%s_%s' % (self.__class__.__name__, self.request.id, self.request.retries) | |
def acquire_lock(self): | |
""" | |
Set lock | |
""" | |
result = self.cache.add(self.lock_key, True, self.lock_expiration) | |
logger.debug('Acquiring %s key %s', self.lock_key, 'succeed' if result else 'failed') | |
return result | |
def __call__(self, *args, **kwargs): | |
""" | |
Checking for lock existence | |
""" | |
if self.acquire_lock(): | |
logger.debug('Task %s execution with lock started', self.request.id) | |
return super(TaskWithLock, self).__call__(*args, **kwargs) | |
logger.warning('Task %s skipped due lock detection', self.request.id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Found this code hanging out in a production codebase without a reference, so thanks for the inadvertent contribution. I've linked to your gist for future reference. As this is an adaptation of the
memcached
example from the Celery Cookbook, you should consider submitting it to the project!Redis also supports a native
lock
operation that could probably simplify this implementation, and I found an alternative gist that demonstrates the technique.Do you use this one in production? Have you refined this further?
What do you think about the alternative?