Last active February 5, 2024 05:51
Celery Task with lock
from celery import Task
from django.conf import settings
from django.core.cache import caches
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# noinspection PyAbstractClass
class TaskWithLock(Task):
Base task with lock to prevent multiple execution of tasks with ETA.
It's happens with multiple workers for tasks with any delay (countdown, ETA).
You may override cache backend by setting `CELERY_TASK_LOCK_CACHE` in your Django settings file
abstract = True
cache = caches[getattr(settings, 'CELERY_TASK_LOCK_CACHE', 'default')]
lock_expiration = 60 * 60 * 24 # 1 day
def lock_key(self):
Unique string for task as lock key
return 'TaskLock_%s_%s_%s' % (self.__class__.__name__,, self.request.retries)
def acquire_lock(self):
Set lock
result = self.cache.add(self.lock_key, True, self.lock_expiration)
logger.debug('Acquiring %s key %s', self.lock_key, 'succeed' if result else 'failed')
return result
def __call__(self, *args, **kwargs):
Checking for lock existence
if self.acquire_lock():
logger.debug('Task %s execution with lock started',
return super(TaskWithLock, self).__call__(*args, **kwargs)
logger.warning('Task %s skipped due lock detection',
Found this code hanging out in a production codebase without a reference, so thanks for the inadvertent contribution. I've linked to your gist for future reference. As this is an adaptation of the memcached example from the Celery Cookbook, you should consider submitting it to the project!

Redis also supports a native lock operation that could probably simplify this implementation, and I found an alternative gist that demonstrates the technique.

Do you use this one in production? Have you refined this further?

What do you think about the alternative?

