Created
August 9, 2020 02:06
-
-
Save aaronpolhamus/cb305a3350f943215d00b66c85f576ea to your computer and use it in GitHub Desktop.
task locking with redis + celery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Task locking with redis in celery is hard, and good examples are tough to come by. This is the approach that's | |
worked for me, based on great work that other folks have posted: | |
* https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/ | |
* http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html | |
* https://redis.io/topics/distlock | |
This isn't polished,but hopefully it's useful. To verify it in our local test env we register the following test task in | |
our definitions file: | |
*** definitions.py *** | |
@celery.task(name="async_test_task_lock") | |
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT) | |
def async_test_task_lock(game_id): | |
print(f"processing game_id {game_id}") | |
time.sleep(TASK_LOCK_TEST_SLEEP) | |
*** test_celery_tasks.py *** | |
and here's the test, using unittest.TestCase | |
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP | |
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG | |
class TestTaskLocking(TestCase): | |
def test_task_locking(self): | |
rds.flushall() | |
res1 = async_test_task_lock.delay(3) | |
res2 = async_test_task_lock.delay(5) | |
self.assertFalse(res1.ready()) | |
self.assertFalse(res2.ready()) | |
res3 = async_test_task_lock.delay(5) | |
res4 = async_test_task_lock.delay(5) | |
self.assertEqual(res3.get(), TASK_LOCK_MSG) | |
self.assertEqual(res4.get(), TASK_LOCK_MSG) | |
time.sleep(TASK_LOCK_TEST_SLEEP) | |
res5 = async_test_task_lock.delay(3) | |
self.assertFalse(res5.ready()) | |
""" | |
import base64 | |
from contextlib import contextmanager | |
import json | |
import pickle as pkl | |
import uuid | |
from backend.config import Config | |
from redis import StrictRedis | |
from redis_cache import RedisCache | |
from redlock import Redlock | |
rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8") | |
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8") | |
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads) | |
dlm = Redlock([{"host": Config.REDIS_HOST}]) | |
TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock" | |
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60 # by default keep cached values around for 8 days | |
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60 # we can keep cached values around for a shorter period of time | |
REMOVE_ONLY_IF_OWNER_SCRIPT = """ | |
if redis.call("get",KEYS[1]) == ARGV[1] then | |
return redis.call("del",KEYS[1]) | |
else | |
return 0 | |
end | |
""" | |
@contextmanager | |
def redis_lock(lock_name, expires=60): | |
# https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/ | |
random_value = str(uuid.uuid4()) | |
lock_acquired = bool( | |
rds.set(lock_name, random_value, ex=expires, nx=True) | |
) | |
print(f'Lock acquired? {lock_name} for {expires} - {lock_acquired}') | |
yield lock_acquired | |
if lock_acquired: | |
# if lock was acquired, then try to release it BUT ONLY if we are the owner | |
# (i.e. value inside is identical to what we put there originally) | |
rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value) | |
print(f'Lock {lock_name} released!') | |
def argument_signature(*args, **kwargs): | |
arg_list = [str(x) for x in args] | |
kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()] | |
return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode() | |
def task_lock(func=None, main_key="", timeout=None): | |
def _dec(run_func): | |
def _caller(*args, **kwargs): | |
with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired: | |
if not acquired: | |
return TASK_LOCK_MSG | |
return run_func(*args, **kwargs) | |
return _caller | |
return _dec(func) if func is not None else _dec | |
def unpack_redis_json(key: str): | |
result = rds.get(key) | |
if result is not None: | |
return json.loads(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Found this gist looking for the source of another example that I found in a production codebase: https://gist.github.com/Skyross/2f4c95f5df2446b71f74f4f9d9771125
Thanks for your ideas on this!
We're using a custom
celery.Task
subtask like the above example, but I think that usingRedis.lock
would simplify the implementation significantly. Have you encountered any difficulty with it in production?