Skip to content

Instantly share code, notes, and snippets.

@fermayo
Last active July 4, 2019 15:31
Show Gist options
  • Save fermayo/6f009162bf4a9576ddf6721a88b41b13 to your computer and use it in GitHub Desktop.
Save fermayo/6f009162bf4a9576ddf6721a88b41b13 to your computer and use it in GitHub Desktop.
Generic Celery task that calls Django model methods asynchronously
import importlib
from functools import wraps
from myproject.celery import app
@app.task
def call_async_task(obj_module_name, obj_class_name, obj_pk, obj_method, obj_args=None, obj_kwargs=None):
model_class = getattr(importlib.import_module(obj_module_name), obj_class_name)
obj = model_class.objects.get(pk=obj_pk)
method = getattr(obj, obj_method)
obj_args = obj_args or []
obj_kwargs = obj_kwargs or {}
return method(*obj_args, **obj_kwargs)
def call_async(obj, obj_method, *obj_args, **obj_kwargs):
call_async_task.delay(obj.__module__, obj.__class__.__name__, obj.pk, obj_method, obj_args, obj_kwargs)
def async_capable(f):
@wraps(f)
def func(self, *args, **kwargs):
if kwargs.pop('async', False):
return call_async(self, func.__name__, *args, **kwargs)
return f(self, *args, **kwargs)
return func
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment