Skip to content

Instantly share code, notes, and snippets.

@traut
Last active August 29, 2015 14:19
Show Gist options
  • Save traut/fcf5d76e07dd3ed6e54d to your computer and use it in GitHub Desktop.
Save traut/fcf5d76e07dd3ed6e54d to your computer and use it in GitHub Desktop.
Files needed to reproduce Celery issue
from __future__ import print_function
import pprint
import time
from celery import group, chain, chord
import simple_tasks as t
workflow = chain(
t.task1.s(),
chord(
[
t.task2.s(),
chain(
t.task3.s(),
chord(
[t.task4.s(), t.task5.s()],
t.task6.s()
)
)
],
t.task7.s()
)
)
#workflow = chain(
# t.task1.s(),
# group(
# t.task2.s(),
# chain(
# t.task3.s(),
# group(t.task4.s(), t.task5.s())
# )
# )
#)
result = workflow.delay()
def print_status(result):
print("{} ".format(result.__class__.__name__), end='')
if hasattr(result, 'state'):
print("{} {} ({})".format(result.task_name, result.id, result.state))
else:
print("{}".format(result.id))
if result.parent:
print_status(result.parent)
if result.children:
for r in result.children:
print_status(r)
while not result.ready():
print_status(result)
time.sleep(5)
else:
print_status(result)
$ python flow.py
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (PENDING)
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (STARTED)
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (SUCCESS)
GroupResult 7d427845-8725-49dd-9af6-cca32d35b0e5
AsyncResult None b4603400-cd2b-4a00-bd91-06f21cc04ebc (STARTED)
AsyncResult None 3eadb949-3b36-407d-8a61-b7498c4eece9 (PENDING)
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (SUCCESS)
GroupResult 7d427845-8725-49dd-9af6-cca32d35b0e5
AsyncResult None b4603400-cd2b-4a00-bd91-06f21cc04ebc (SUCCESS)
AsyncResult None 3eadb949-3b36-407d-8a61-b7498c4eece9 (PENDING)
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (SUCCESS)
GroupResult 7d427845-8725-49dd-9af6-cca32d35b0e5
AsyncResult None b4603400-cd2b-4a00-bd91-06f21cc04ebc (SUCCESS)
AsyncResult None 3eadb949-3b36-407d-8a61-b7498c4eece9 (PENDING)
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (SUCCESS)
GroupResult 7d427845-8725-49dd-9af6-cca32d35b0e5
AsyncResult None 957b64a0-b219-4e91-9bb9-cf43d2ee3a6e (PENDING)
AsyncResult None c772cd4f-775b-4340-9ff9-e306ee84f4e4 (SUCCESS)
...
$ celery worker -A simple_tasks -l info
-------------- celery@suzume.local v3.1.17 (Cipater)
---- **** -----
--- * *** * -- Darwin-14.1.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: simple_tasks:0x1010ec1d0
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. simple_tasks.task1
. simple_tasks.task2
. simple_tasks.task3
. simple_tasks.task4
. simple_tasks.task5
. simple_tasks.task6
. simple_tasks.task7
Connected to redis://localhost:6379/0
mingle: searching for neighbors
mingle: all alone
celery@suzume.local ready.
Received task: simple_tasks.task1[c772cd4f-775b-4340-9ff9-e306ee84f4e4]
task1 start
task1 finish
Received task: simple_tasks.task2[b4603400-cd2b-4a00-bd91-06f21cc04ebc]
Received task: simple_tasks.task3[a1805e5f-302c-4e56-9c19-080036dd347b]
Task simple_tasks.task1[c772cd4f-775b-4340-9ff9-e306ee84f4e4] succeeded in 5.02584462601s: 'result task1'
task2 start
task3 start
task2 finish
task3 finish
Task simple_tasks.task2[b4603400-cd2b-4a00-bd91-06f21cc04ebc] succeeded in 5.00715394001s: 'result task2'
Received task: simple_tasks.task4[8f6c8ad3-b5b3-40c2-bc97-5f9e6a71a4bc]
Task simple_tasks.task3[a1805e5f-302c-4e56-9c19-080036dd347b] succeeded in 5.024643707s: 'result task3'
Received task: simple_tasks.task5[23222361-0d3c-4d5b-b40f-60d17449e343]
task4 start
task5 start
task4 finish
task5 finish
Task simple_tasks.task4[8f6c8ad3-b5b3-40c2-bc97-5f9e6a71a4bc] succeeded in 5.00776234898s: 'result task4'
Received task: simple_tasks.task6[58a3ebcf-6bb1-43dd-8e21-dd3f2a3d6d9f]
Task simple_tasks.task5[23222361-0d3c-4d5b-b40f-60d17449e343] succeeded in 5.528548242s: 'result task5'
task6 start
task6 finish
Task simple_tasks.task6[58a3ebcf-6bb1-43dd-8e21-dd3f2a3d6d9f] succeeded in 5.005484494s: 'result task6'
from __future__ import print_function
import time
from celery import Celery
redis_uri = "redis://%(host)s:%(port)d/%(db)d" % dict(
host = 'localhost',
port = 6379,
db = 0
)
app = Celery(__name__)
app.conf.update(
BROKER_URL = redis_uri,
CELERY_RESULT_BACKEND = redis_uri,
CELERY_TASK_SERIALIZER = 'pickle',
CELERY_RESULT_SERIALIZER = 'pickle',
CELERY_ACCEPT_CONTENT = ['pickle'],
CELERYD_LOG_FORMAT = "%(message)s",
CELERY_TRACK_STARTED = True,
)
def run_task(num):
print("task%d start" % num)
time.sleep(5)
print("task%d finish" % num)
return "result task%d" % num
@app.task
def task1(*args, **kwargs):
return run_task(1)
@app.task
def task2(*args, **kwargs):
return run_task(2)
@app.task
def task3(*args, **kwargs):
return run_task(3)
@app.task
def task4(*args, **kwargs):
return run_task(4)
@app.task
def task5(*args, **kwargs):
return run_task(5)
@app.task
def task6(*args, **kwargs):
return run_task(6)
@app.task
def task7(*args, **kwargs):
return run_task(7)
if __name__ == '__main__':
app.worker_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment