Skip to content

Instantly share code, notes, and snippets.

@brouberol
Last active December 21, 2015 10:40
Show Gist options
  • Save brouberol/051447e65550bf1bb1a3 to your computer and use it in GitHub Desktop.
Save brouberol/051447e65550bf1bb1a3 to your computer and use it in GitHub Desktop.
ChainedTask celery abstract task
from celery import Task
class ChainedTask(Task):
abstract = True
def apply_async(self, args, kwargs, **options):
"""Injects the dict return value of a a task as keyword args of the next one."""
if args:
# handle the case of a bound task (decorated with bind=True)
if isinstance(args[0], Task)
if len(args) > 1 and isinstance(args[1], dict):
task_kwargs = args[1]
args.pop(1)
else:
task_kwargs = {}
else:
if isinstance(args[0], dict):
task_kwargs = args[0]
args.pop(0)
else:
task_kwargs = {}
kwargs.update(task_kwargs)
return super(ChainedTask, self).apply_async(args, kwargs, **options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment