Skip to content

Instantly share code, notes, and snippets.

@jonashaag
Last active March 28, 2022 23:14
Show Gist options
  • Save jonashaag/3b56e831839243e5391ff6ddec85dbdf to your computer and use it in GitHub Desktop.
Save jonashaag/3b56e831839243e5391ff6ddec85dbdf to your computer and use it in GitHub Desktop.
Celery Django model serializer – pass Django model instances as arguments to Celery tasks
"""Our convenience Celery Task wrapper that allows us to conveniently pass
model instances as Task arguments.
It "serializes" model instances to IDs and "deserializes" these IDs to model
instances upon task execution.
Serialized representation of a model instance is (sentinel, app_name, model_name, pk).
"""
import celery
from django.apps import apps
from django.db import models
from kombu.utils.objects import cached_property
MODEL_ARGUMENT_MARKER = '__model_instance__'
def wrap_model_argument(arg):
if isinstance(arg, models.Model):
return (MODEL_ARGUMENT_MARKER, arg._meta.app_label,
arg._meta.model_name, arg.pk)
else:
return arg
def unwrap_model_argument(arg):
if isinstance(
arg,
(list, tuple)) and len(arg) == 4 and arg[0] == MODEL_ARGUMENT_MARKER:
return apps.get_model(arg[1], arg[2]).objects.get(pk=arg[3])
else:
return arg
class ModelArgumentTask(celery.Task):
def __call__(self, *args, **kwargs):
return wrap_model_argument(super().__call__(
*[unwrap_model_argument(arg) for arg in args],
**{k: unwrap_model_argument(v)
for k, v in kwargs.items()}))
def apply_async(self, args=None, kwargs=None, *apply_args, **apply_kwargs):
return super().apply_async(
[wrap_model_argument(arg) for arg in args or ()],
{k: wrap_model_argument(v)
for k, v in (kwargs or {}).items()}, *apply_args, **apply_kwargs)
def signature(self, args, kwargs, *starargs, **starkwargs):
return super().signature(
[wrap_model_argument(arg) for arg in args or ()],
{k: wrap_model_argument(v)
for k, v in (kwargs or {}).items()}, *starargs, **starkwargs)
def AsyncResult(self, task_id, **kwargs):
return self.AsyncResult_wrapper(task_id, backend=self.backend, task_name=self.name, **kwargs)
@cached_property
def AsyncResult_wrapper(self):
class ModelArgumentAsyncResult(self._get_app().AsyncResult):
def get(self, *args, **kwargs):
return unwrap_model_argument(super().get(*args, **kwargs))
return ModelArgumentAsyncResult
def task(*args, **kwargs):
if len(args) == 1:
return celery.task(base=ModelArgumentTask)(args[0])
else:
assert 'base' not in kwargs
kwargs['base'] = ModelArgumentTask
return celery.task(*args, **kwargs)
@stevelacey
Copy link

This looks great but I couldn't make it work, is it incompatible with queuing jobs via send_task?

Seems like celery barfs trying to route the task before it ever runs apply_async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment