Skip to content

Instantly share code, notes, and snippets.

@aaronharnly
Created October 4, 2012 22:44
Show Gist options
  • Save aaronharnly/3836930 to your computer and use it in GitHub Desktop.
Save aaronharnly/3836930 to your computer and use it in GitHub Desktop.
celery chain doesn't propagate errors
#!/usr/bin/env python
#
# Good reading:
# http://stackoverflow.com/questions/12660994/how-to-fail-the-chain-if-its-sub-task-gives-an-exception
#
#
# Celery instance
def create_celery():
from celery import Celery
celery = Celery()
celery.conf.update(
BROKER_URL='amqp://guest@localhost',
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=20
)
return celery
celery = create_celery()
#
# Task definitions
#
@celery.task(name='add1')
def add1(x):
return x + 1
@celery.task(name='mult2')
def mult2(x):
return x * 2
@celery.task(name='fails')
def fails(x):
raise AssertionError("This task failed!")
def create_chain(fail=False):
from celery import chain
tasks = [add1.s(), mult2.s()]
if fail:
tasks.insert(1, fails.s())
return chain(*tasks)
#
# Publisher
#
def log(result, indent=0, prefix=""):
spaces = indent * " "
print spaces, prefix, "result=%s, .status=%s, .results=%s" % (repr(result), result.status, result.result)
if result.parent:
log(result.parent, indent=indent+4, prefix="parent")
def publisher(fail=False):
from time import sleep
workflow = create_chain(fail)
result = workflow.apply_async(args=[3])
while not result.ready():
log(result)
sleep(1)
log(result)
assert result.result == 8
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('--fail', dest='fail', action='store_true')
options = parser.parse_args()
publisher(fail=options.fail)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment