Skip to content

Instantly share code, notes, and snippets.

@pardo
pardo / debounced_celery_task.py
Last active September 8, 2023 08:04
Debounced celery task in python
def debounced_wrap(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = kwargs.pop("key") # it's required
call_count = kwargs.pop("call_count", 1)
count = cache.get(key, 1)
if count > call_count:
# someone called the function again before the this was executed
return None
# I'm the last call