Created
October 4, 2012 22:44
-
-
Save aaronharnly/3836930 to your computer and use it in GitHub Desktop.
celery chain doesn't propagate errors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Good reading: | |
# http://stackoverflow.com/questions/12660994/how-to-fail-the-chain-if-its-sub-task-gives-an-exception | |
# | |
# | |
# Celery instance | |
def create_celery(): | |
from celery import Celery | |
celery = Celery() | |
celery.conf.update( | |
BROKER_URL='amqp://guest@localhost', | |
CELERY_RESULT_BACKEND='amqp', | |
CELERY_TASK_RESULT_EXPIRES=20 | |
) | |
return celery | |
celery = create_celery() | |
# | |
# Task definitions | |
# | |
@celery.task(name='add1') | |
def add1(x): | |
return x + 1 | |
@celery.task(name='mult2') | |
def mult2(x): | |
return x * 2 | |
@celery.task(name='fails') | |
def fails(x): | |
raise AssertionError("This task failed!") | |
def create_chain(fail=False): | |
from celery import chain | |
tasks = [add1.s(), mult2.s()] | |
if fail: | |
tasks.insert(1, fails.s()) | |
return chain(*tasks) | |
# | |
# Publisher | |
# | |
def log(result, indent=0, prefix=""): | |
spaces = indent * " " | |
print spaces, prefix, "result=%s, .status=%s, .results=%s" % (repr(result), result.status, result.result) | |
if result.parent: | |
log(result.parent, indent=indent+4, prefix="parent") | |
def publisher(fail=False): | |
from time import sleep | |
workflow = create_chain(fail) | |
result = workflow.apply_async(args=[3]) | |
while not result.ready(): | |
log(result) | |
sleep(1) | |
log(result) | |
assert result.result == 8 | |
if __name__ == '__main__': | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('--fail', dest='fail', action='store_true') | |
options = parser.parse_args() | |
publisher(fail=options.fail) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment