Skip to content

Instantly share code, notes, and snippets.

@LowerDeez
Last active December 26, 2019 08:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LowerDeez/d36f30456dc6b690e043ec59defdc6d5 to your computer and use it in GitHub Desktop.
Save LowerDeez/d36f30456dc6b690e043ec59defdc6d5 to your computer and use it in GitHub Desktop.
Celery. Task no simultaneous execution
from functools import wraps
from django.core.cache import cache
from celery.five import monotonic
CACHE_LOCK_EXPIRE = 10
def no_simultaneous_execution(f):
"""
https://stackoverflow.com/a/59301422/8342448
Decorator that prevents a task form being executed with the
same *args and **kwargs more than one at a time.
"""
@wraps(f)
def wrapper(self, *args, **kwargs):
# Create lock_id used as cache key
lock_id = '{}-{}-{}'.format(self.name, args, kwargs)
print('Lock id:', lock_id)
# Timeout with a small diff, so we'll leave the lock delete
# to the cache if it's close to being auto-removed/expired
timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3
print('Timeout at:', timeout_at)
# Try to acquire a lock, or put task back on queue
lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
print('Lock acquired', lock_acquired)
if not lock_acquired:
print('Apply async', self)
self.s(*args).apply_async(countdown=3)
return
print('Run', f)
try:
f(self, *args, **kwargs)
finally:
# Release the lock
if monotonic() < timeout_at:
cache.delete(lock_id)
return wrapper
from .tasks import sync_order_task
...
@receiver(post_save, sender=Order)
def post_save_order_handler(sender, instance: 'Order', **kwargs):
if not instance.is_created:
sync_order_task.delay(instance.pk)
from .decorators import no_simultaneous_execution
...
@app.task(bind=True, name="sync_order_task")
@no_simultaneous_execution
def sync_order_task(self, pk: str):
OrderModel = apps.get_model('order', 'Order')
order = OrderModel.objects.get(pk=pk)
try:
sync_order = SyncOrder()
sync_order.sync(order.pk)
logger.info(
f'Order #pk: {order.pk} was synced.'
)
except Exception as e:
_error_handler(e, order)
raise self.retry(exc=e, countdown=COUNTDOWN_TIMEOUT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment