Skip to content

Instantly share code, notes, and snippets.

@ask
Created August 31, 2011 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ask/1183399 to your computer and use it in GitHub Desktop.
Save ask/1183399 to your computer and use it in GitHub Desktop.
traverse celery results
from collections import deque
from celery.result import BaseAsyncResult, TaskSetResult
from celery.task import chord, task, TaskSet
def force_list(l):
if not isinstance(l, (list, tuple)):
return [l]
return l
def traverse(start):
stack = deque([start])
while stack:
for subres in force_list(stack.popleft()):
if isinstance(subres, TaskSetResult):
stack.append(subres.join())
elif isinstance(subres, BaseAsyncResult):
stack.append(subres.get())
else:
yield subres
@task
def tA():
return tB.apply_async()
@task
def tB():
return TaskSet(tC.subtask((i, )) for i in xrange(30)).apply_async()
@task
def tC(i):
return chord(tD.subtask((i, )) for i in xrange(i))(tS.subtask())
@task
def tD(i):
return i ** i
@task
def tS(numbers):
return sum(numbers)
def test():
for res in traverse(tA.apply_async()):
print(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment