Skip to content

Instantly share code, notes, and snippets.

@lrvick
Created June 22, 2011 17:02
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save lrvick/1040562 to your computer and use it in GitHub Desktop.
Save lrvick/1040562 to your computer and use it in GitHub Desktop.
Get async celery results from subtasks
from celery.result import AsyncResult
from celery.execute import send_task
def get_results(queries):
result = send_task('task1',queries)
results = result.get()
#this does not return ids until _after_ all the tasks are complete, for some reason.
while results:
#pop first off queue, this will shorten the list and eventually break out of while
first_id = results.pop(0)
r = AsyncResult(first_id)
if not r.ready():
results.append(first_id) #add it back to the bottom o fthe queue
else:
out = r.get()
if out: print out
get_results(['a','b','c'])
from celery.decorators import task
@task
def task1(queries):
task_ids = []
taskset = TaskSet(task2.subtask((query, )) for query in queries).apply_async()
results = taskset.join()
@task
def task2(query):
task_ids = [r.task_id for r in TaskSet(task3.subtask() for _ in xrange(20)).apply_async()]
return task_ids
@task
def task3()
time.sleep(2)
return 'done'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment