Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Get async celery results from nested subtasks as they complete
from tasks import task1
def get_results(queries):
query_procs = task1.delay(queries).get()
tasks = []
for query_proc in query_procs:
tasks.extend(query_proc.subtasks)
while tasks:
current_task = tasks.pop(0)
if current_task.ready():
yield current_task.get()
else:
tasks.append(current_task)
for result in get_results(['a','b','c']):
print result
from celery.task import task,TaskSet
import time
import random
@task
def task1(queries):
return TaskSet(task2.subtask((query, )) for query in queries).apply_async()
@task
def task2(query):
return TaskSet(task3.subtask(query) for _ in xrange(20)).apply_async()
@task
def task3(query):
seconds = random.randint(0,10)
time.sleep(seconds)
return query,seconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.