Skip to content

Instantly share code, notes, and snippets.

@yaodong
Created March 4, 2017 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaodong/d80710f6791a1a52bbde801bd43fa81c to your computer and use it in GitHub Desktop.
Save yaodong/d80710f6791a1a52bbde801bd43fa81c to your computer and use it in GitHub Desktop.
celery workflow test #temp
from celery import shared_task, chord, chain, group
@shared_task()
def workflow_test(*args, name="", time=2):
print('name [%s] %s' % (name, args))
sleep(time)
return name
chord(
header=group(
chain(workflow_test.s(name='dis 1'), workflow_test.s(name='dipha 1')),
chain(workflow_test.s(name='dis 2'), workflow_test.s(name='dipha 2')),
chain(workflow_test.s(name='dis 3'), workflow_test.s(name='dipha 3'))
),
body=chain(
chord(
header=workflow_test.s(name='dispatch 1'),
body=workflow_test.s(name='dispatch 2'),
),
workflow_test.s(name='find best graph')
),
).delay('xxx.csv', 1233)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment