Skip to content

Instantly share code, notes, and snippets.

Last active July 4, 2019 15:31
Show Gist options
  • Save fermayo/6f009162bf4a9576ddf6721a88b41b13 to your computer and use it in GitHub Desktop.
Save fermayo/6f009162bf4a9576ddf6721a88b41b13 to your computer and use it in GitHub Desktop.
Generic Celery task that calls Django model methods asynchronously
import importlib
from functools import wraps
from myproject.celery import app
def call_async_task(obj_module_name, obj_class_name, obj_pk, obj_method, obj_args=None, obj_kwargs=None):
model_class = getattr(importlib.import_module(obj_module_name), obj_class_name)
obj = model_class.objects.get(pk=obj_pk)
method = getattr(obj, obj_method)
obj_args = obj_args or []
obj_kwargs = obj_kwargs or {}
return method(*obj_args, **obj_kwargs)
def call_async(obj, obj_method, *obj_args, **obj_kwargs):
call_async_task.delay(obj.__module__, obj.__class__.__name__,, obj_method, obj_args, obj_kwargs)
def async_capable(f):
def func(self, *args, **kwargs):
if kwargs.pop('async', False):
return call_async(self, func.__name__, *args, **kwargs)
return f(self, *args, **kwargs)
return func
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment