Skip to content

Instantly share code, notes, and snippets.

@tsoporan
Forked from lrvick/results.py
Created June 23, 2011 03:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tsoporan/1041862 to your computer and use it in GitHub Desktop.
Save tsoporan/1041862 to your computer and use it in GitHub Desktop.
Get async celery results from nested subtasks as they complete
from tasks import task1
def get_results(queries):
query_procs = task1.delay(queries).get()
tasks = []
for query_proc in query_procs:
tasks.extend(query_proc.subtasks)
while tasks:
current_task = tasks.pop(0)
if current_task.ready():
yield current_task.get()
else:
tasks.append(current_task)
for result in get_results(['a','b','c']):
print result
from celery.task import task,TaskSet
import time
import random
@task
def task1(queries):
return TaskSet(task2.subtask((query, )) for query in queries).apply_async()
@task
def task2(query):
return TaskSet(task3.subtask(query) for _ in xrange(20)).apply_async()
@task
def task3(query):
seconds = random.randint(0,10)
time.sleep(seconds)
return query,seconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment