Created August 9, 2020 02:06
task locking with redis + celery
"""Task locking with redis in celery is hard, and good examples are tough to come by. This is the approach that's
worked for me, based on great work that other folks have posted:
This isn't polished,but hopefully it's useful. To verify it in our local test env we register the following test task in
our definitions file:
*** ***
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
print(f"processing game_id {game_id}")
*** ***
and here's the test, using unittest.TestCase
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
def test_task_locking(self):
res1 = async_test_task_lock.delay(3)
res2 = async_test_task_lock.delay(5)
res3 = async_test_task_lock.delay(5)
res4 = async_test_task_lock.delay(5)
self.assertEqual(res3.get(), TASK_LOCK_MSG)
self.assertEqual(res4.get(), TASK_LOCK_MSG)
res5 = async_test_task_lock.delay(3)
import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid
from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock
rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])
TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60 # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60 # we can keep cached values around for a shorter period of time
if"get",KEYS[1]) == ARGV[1] then
return 0
def redis_lock(lock_name, expires=60):
random_value = str(uuid.uuid4())
lock_acquired = bool(
rds.set(lock_name, random_value, ex=expires, nx=True)
print(f'Lock acquired? {lock_name} for {expires} - {lock_acquired}')
yield lock_acquired
if lock_acquired:
# if lock was acquired, then try to release it BUT ONLY if we are the owner
# (i.e. value inside is identical to what we put there originally)
rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)
print(f'Lock {lock_name} released!')
def argument_signature(*args, **kwargs):
arg_list = [str(x) for x in args]
kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()
def task_lock(func=None, main_key="", timeout=None):
def _dec(run_func):
def _caller(*args, **kwargs):
with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
if not acquired:
return run_func(*args, **kwargs)
return _caller
return _dec(func) if func is not None else _dec
def unpack_redis_json(key: str):
result = rds.get(key)
if result is not None:
return json.loads(result)
Found this gist looking for the source of another example that I found in a production codebase:

Thanks for your ideas on this!

We're using a custom celery.Task subtask like the above example, but I think that using Redis.lock would simplify the implementation significantly. Have you encountered any difficulty with it in production?

