Skip to content

Instantly share code, notes, and snippets.

@pipermerriam
Last active January 1, 2016 08:29
Show Gist options
  • Save pipermerriam/8118402 to your computer and use it in GitHub Desktop.
Save pipermerriam/8118402 to your computer and use it in GitHub Desktop.
celery chaining issue
@celery.task
def first():
# generates an iterable
return 1, 2, 3
@celery.task
def second(thing):
"""
Takes a single item from the iterable from `first`, does some computation,
and returns an iterable.
There will be one `second` task for each item in the iterable returned by
`first`
"""
return [data for data in range(thing)]
@celery.task
def third(data):
"""
Takes a sindgle item from the iterable from `second` does some computation,
and returns an iterable
"""
return 1 # result
@celery.task
def fourth(results):
"""
Takes the results of all `third` tasks, and does some processing on them.
"""
print "####################################"
print 'results', results
print "####################################"
return sum(itertools.chain(*results.iterate()))
@celery.task
def map_to_second(things):
"""
This concept works pretty good if there is only one level of 'chording',
but in my situation, I have two. How do I have the chord from the top level
"""
return celery.chord(
celery.chain(second.s(thing), map_to_third.s()) for thing in things
)(fourth.s())
@celery.task
def map_to_third(list_of_data):
"""
This function doesn't have enough 'context' to feed the results into
`fourth`.
"""
return celery.group(
third.s(data) for data in list_of_data
).apply_async()
@celery.task
def doit():
return celery.chain(
first.s(),
map_to_second.s(),
).apply_async()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment