Skip to content

Instantly share code, notes, and snippets.

@odigity
Created May 14, 2022 17:32
Show Gist options
  • Save odigity/fb109a738f74c8174cbba4e2977dc63d to your computer and use it in GitHub Desktop.
Save odigity/fb109a738f74c8174cbba4e2977dc63d to your computer and use it in GitHub Desktop.
Celery Workflows
───────────────────────────────────────────────────────────────────────────
• First Task Enqueuing Second Task
(T)
└────> (T)
---------------------------------------------------------------------------
pr = p.delay() -> <AsyncResult: fe7c16b9>
pr.children -> [ <AsyncResult: 9cebcbbc> ]
pr.children[0].parent -> None
---------------------------------------------------------------------------
Task p[fe7c16b9]
root_id = fe7c16b9
children = [] ...then... [ <AsyncResult: 9cebcbbc> ]
---------------------------------------------------------------------------
Task c[9cebcbbc]
root_id = fe7c16b9
parent_id = fe7c16b9
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• First Task Enqueuing Multiple Tasks
(T)
├────> (T)
└────> (T)
---------------------------------------------------------------------------
pr = p.delay() -> <AsyncResult: 2585d485>
pr.children -> [ <AsyncResult: e41a3761>, <AsyncResult: ded98925> ]
pr.children[0].parent -> None
---------------------------------------------------------------------------
Task p[2585d485]
root_id = 2585d485
children = [] ...then... [ <AsyncResult: e41a3761>, <AsyncResult: ded98925> ]
---------------------------------------------------------------------------
Task c[e41a3761]
root_id = 2585d485
parent_id = 2585d485
---------------------------------------------------------------------------
Task c[ded98925]
root_id = 2585d485
parent_id = 2585d485
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• First Task Enqueuing Second Task Enqueuing Third Task
(T)
└────> (T)
└────> (T)
---------------------------------------------------------------------------
gr = g.delay() -> <AsyncResult: ae6f06e3>
gr.children -> [ <AsyncResult: cb25b5c6> ]
gr.children[0].children -> [ <AsyncResult: 0f95dd74> ]
---------------------------------------------------------------------------
Task g[ae6f06e3]
root_id = ae6f06e3
children = [] ...then... [ <AsyncResult: cb25b5c6> ]
---------------------------------------------------------------------------
Task p[cb25b5c6]
root_id = ae6f06e3
parent_id = ae6f06e3
children = [] ...then... [ <AsyncResult: 0f95dd74> ]
---------------------------------------------------------------------------
Task c[0f95dd74]
root_id = ae6f06e3
parent_id = cb25b5c6
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Task w/ Callback
(T) --> (T)
---------------------------------------------------------------------------
t1r = add.apply_async( [2, 3], link=mul.s(7) ) -> <AsyncResult: 1620c5ff>
t1r.children -> [ <AsyncResult: 5cbfc4f0> ]
t1r.children[0].parent -> None
---------------------------------------------------------------------------
Task add[1620c5ff]
root_id = 1620c5ff
callbacks = [
{ 'task': 'mul', 'args': [7] }
]
children = [] ...then... [ <AsyncResult: 5cbfc4f0> ]
---------------------------------------------------------------------------
Task mul[5cbfc4f0]
root_id = 1620c5ff
parent_id = 1620c5ff
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Task w/ 2 Callbacks
(T) --> │T│
│T│
---------------------------------------------------------------------------
t1r = add.apply_async( [2, 3], link=[ mul.s(5), mul.s(6) ] )
-or-
t1 = add.s(2, 3) ; t1.link(mul.s(5)) ; t1.link(mul.s(6)) ; t1r = t1.delay()
-> <AsyncResult: d78b1639>
t1r.children -> [ <GroupResult: d91c9bf8 [e0068e43, 9a0189bf]> ]
t1r.children[0].parent -> None
t1r.children[0].children -> [ <AsyncResult: e0068e43>, <AsyncResult: 9a0189bf> ]
---------------------------------------------------------------------------
Task add[d78b1639]
root_id = d78b1639
callbacks = [
{ 'task': 'mul', 'args': [5] }
{ 'task': 'mul', 'args': [6] }
]
children = [] ...then... [ <GroupResult: d91c9bf8 [e0068e43, 9a0189bf]> ]
---------------------------------------------------------------------------
Task mul[e0068e43]
root_id = d78b1639
parent_id = d78b1639
group = d91c9bf8
---------------------------------------------------------------------------
Task mul[9a0189bf]
root_id = d78b1639
parent_id = d78b1639
group = d91c9bf8
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Task w/ Callback Group of 2 Tasks
(T) --> │T│
│T│
---------------------------------------------------------------------------
t1r = _pass_thru.apply_async( [42], link=group( add.s(7), add.s(9) ) )
-> <AsyncResult: a30ea0fe>
t1r.children -> [ <GroupResult: 3f70f38c [1f1f7311, 0144369a]> ]
t1r.children[0].parent -> None
---------------------------------------------------------------------------
Task _pass_thru[a30ea0fe]
root_id = a30ea0fe
callbacks = [
{
'task': 'celery.group',
'kwargs': {
'tasks': [
{ 'task': 'add', 'args': [7] },
{ 'task': 'add', 'args': [9] }
]
},
'subtask_type': 'group'
}
]
children = [] ...then... [ <GroupResult: 3f70f38c [1f1f7311, 0144369a]> ]
---------------------------------------------------------------------------
Task add[1f1f7311]
root_id = a30ea0fe
parent_id = a30ea0fe
group = 3f70f38c
---------------------------------------------------------------------------
Task add[0144369a]
root_id = a30ea0fe
parent_id = a30ea0fe
group = 3f70f38c
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• 3 Linked Tasks
(T) --> (T) --> (T)
---------------------------------------------------------------------------
t1r = add.apply_async( [2, 3], link=mul.signature( [7], link=add.s(8) ) )
-> <AsyncResult: 0f4950b2>
t1r.children -> [ <AsyncResult: 0af053da> ]
t1r.children[0].parent -> None
t1r.children[0].children -> [ <AsyncResult: bbe4bbc8> ]
---------------------------------------------------------------------------
Task add[0f4950b2]
root_id = 0f4950b2
callbacks = [
{ 'task': 'mul', 'args': [7] 'options': { 'link': [
{ 'task': 'add', 'args': [8] }
] } }
]
children = [] ...then... [ <AsyncResult: 0af053da> ]
---------------------------------------------------------------------------
Task mul[0af053da]
root_id = 0f4950b2
parent_id = 0f4950b2
callbacks = [
{ 'task': 'add', 'args': [8] }
]
children = [] ...then... [ <AsyncResult: bbe4bbc8> ]
---------------------------------------------------------------------------
Task add[bbe4bbc8]
root_id = 0f4950b2
parent_id = 0af053da
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chain w/ 3 Tasks
(T) ∞∞> (T) ∞∞> (T)
---------------------------------------------------------------------------
t3r = chain( add.s(2, 3), mul.s(7), add.s(8) ).delay()
-or-
t3r = ( add.s(2, 3) | mul.s(7) | add.s(8) ).delay()
-> <AsyncResult: d5d1a636>
t3r.parent -> <AsyncResult: 2b22c45e>
t3r.parent.children -> [ <AsyncResult: d5d1a636> ]
t3r.parent.parent -> <AsyncResult: dbb1b46b>
---------------------------------------------------------------------------
Task add[dbb1b46b]
root_id = dbb1b46b
chain = [
{ 'task': 'add', 'args': [8], 'options': { 'task_id': 'd5d1a636' } }
{ 'task': 'mul', 'args': [7], 'options': { 'task_id': '2b22c45e' } }
]
children = [] ...then... [ <AsyncResult: 2b22c45e> ]
---------------------------------------------------------------------------
Task mul[2b22c45e]
root_id = dbb1b46b
parent_id = dbb1b46b
chain = [
{ 'task': 'add', 'args': [8] 'options': { 'task_id': 'd5d1a636' } }
]
children = [] ...then... [ <AsyncResult: d5d1a636> ]
---------------------------------------------------------------------------
Task add[d5d1a636]
root_id = dbb1b46b
parent_id = 2b22c45e
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chain w/ 3 Tasks w/ Callback
(T) ∞∞> (T) ∞∞> (T) --> (T)
---------------------------------------------------------------------------
t3r = chain( add.s(2, 3), mul.s(7), add.s(8) ).apply_async(link=_args.s())
-> <AsyncResult: 0dd34cc5>
t3r.parent -> <AsyncResult: 7f6663db>
t3r.children -> [ <AsyncResult: d9043c87> ]
---------------------------------------------------------------------------
Task add[00f00e5c]
root_id = 00f00e5c
chain = [
{ 'task': 'add', 'args': [8], 'options': { 'task_id': '0dd34cc5', 'link': [
{ 'task': '_args' }
] } },
{ 'task': 'mul', 'args': [7], 'options': { 'task_id': '7f6663db' } }
]
children = [] ...then... [ <AsyncResult: 7f6663db> ]
---------------------------------------------------------------------------
Task mul[7f6663db]
root_id = 00f00e5c
parent_id = 00f00e5c
chain = [
{ 'task': 'add', 'args': [8], 'options': { 'task_id': '0dd34cc5', 'link': [
{ 'task': '_args' }
] } }
]
children = [] ...then... [ <AsyncResult: 0dd34cc5> ]
---------------------------------------------------------------------------
Task add[0dd34cc5]
root_id = 00f00e5c
parent_id = 7f6663db
callbacks = [
{ 'task': '_args' }
]
children = [] ...then... [ <AsyncResult: d9043c87> ]
---------------------------------------------------------------------------
Task _args[d9043c87]
root_id = 00f00e5c
parent_id = 0dd34cc5
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Group of Tasks
|T|
|T|
|T|
---------------------------------------------------------------------------
gr = group( add.s(2, i) for i in range(3) ).delay()
-> <GroupResult: 09cbbe2b [69c915cc, fdc62a68, 86fcf847]>
gr.children -> [ <AsyncResult: 69c915cc>, <AsyncResult: fdc62a68>, <AsyncResult: 86fcf847> ]
gr.children[0].parent -> None
---------------------------------------------------------------------------
Task add[18f57656]
root_id = 18f57656
group = 09cbbe2b
---------------------------------------------------------------------------
Task add[e733d1b4]
root_id = e733d1b4
group = 09cbbe2b
---------------------------------------------------------------------------
Task add[b5cb5826]
root_id = b5cb5826
group = 09cbbe2b
---------------------------------------------------------------------------
─────────────────────────────────────────────────────────────────────────── ** AVOID **
• Group of Tasks w/ Callback
|T| --> (T)
|T|
---------------------------------------------------------------------------
g = group( add.s(1, 2), add.s(3, 4) ) ; g.link( _args.s() ) ; gr = g.delay()
-> <GroupResult: b8359c95 [d66bc694, 1dc647c1]>
gr.children -> [ <AsyncResult: d66bc694>, <AsyncResult: 1dc647c1> ]
gr.children[0].children -> [ <AsyncResult: 8171b5ea> ]
---------------------------------------------------------------------------
Task add[d66bc694]
root_id = d66bc694
callbacks = [
{ 'task': '_args', 'immutable': True }
]
group = b8359c95
children = [] ...then... [ <AsyncResult: 8171b5ea> ]
---------------------------------------------------------------------------
Task add[1dc647c1]
root_id = 1dc647c1
group = b8359c95
---------------------------------------------------------------------------
Task _args[8171b5ea]
root_id = d66bc694
parent_id = d66bc694
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chord
|T| ┬─> (T)
|T| ┘
---------------------------------------------------------------------------
t3r = chord( group( _pass_thru.s(2), _sleep.s(10) ), _args.s() ).delay()
-or-
t3r = ( group( _pass_thru.s(2), _sleep.s(10) ) | _args.s() ).delay()
-> <AsyncResult: e2ef6ca9>
t3r.parent -> <GroupResult: 3d9f0733 [55d56c9b, 16553193]>
t3r.parent.children -> [ <AsyncResult: 55d56c9b>, <AsyncResult: 16553193> ]
t3r.parent.children[1].children -> []
--------------------------------------------------------------------------- ChordCounter
group_id : 3d9f0733
sub_tasks : [ [["55d56c9b", null], null], [["16553193", null], null] ]
---------------------------------------------------------------------------
Task _pass_thru[55d56c9b]
root_id = 55d56c9b
group = 3d9f0733
chord = { 'task': '_args', 'options': { 'task_id': 'e2ef6ca9' } }
---------------------------------------------------------------------------
Task _sleep[16553193] received
root_id = 16553193
group = 3d9f0733
chord = { 'task': '_args', 'options': { 'task_id': 'e2ef6ca9' }
children = [] ...then... [ <AsyncResult: e2ef6ca9> ]
---------------------------------------------------------------------------
Task _args[e2ef6ca9]
root_id = 16553193
parent_id = 16553193
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chord w/ Callback
|T| ┬─> (T) --> (T)
|T| ┘
---------------------------------------------------------------------------
t3 = ( group( _pass_thru.s(2), _sleep.s(10) ) | _args.s() )
t3.link(_pass_thru.s())
t3r = t3.delay() -> <AsyncResult: 9addbf63>
t3r.parent -> <GroupResult: a9582b6b [94df9a4c, fcbb0445]>
t3r.children -> [ <AsyncResult: 60b2cf22> ]
--------------------------------------------------------------------------- ChordCounter
group_id : a9582b6b
sub_tasks : [ [["94df9a4c", null], null], [["fcbb0445", null], null] ]
---------------------------------------------------------------------------
Task _pass_thru[94df9a4c]
root_id = 94df9a4c
group = a9582b6b
chord = {
'task': '_args', 'options': { 'task_id': '9addbf63', 'link': [
{ 'task': '_pass_thru' }
] }
}
---------------------------------------------------------------------------
Task _sleep[fcbb0445]
root_id = fcbb0445
group = a9582b6b
chord = {
'task': '_args', 'options': { 'task_id': '9addbf63', 'link': [
{ 'task': '_pass_thru' }
] }
}
children = [] ...then... [ <AsyncResult: 9addbf63> ]
---------------------------------------------------------------------------
Task _args[9addbf63]
root_id = fcbb0445
parent_id = fcbb0445
callbacks = [
{ 'task': '_pass_thru' }
]
children = [] ...then... [ <AsyncResult: 60b2cf22> ]
---------------------------------------------------------------------------
Task _pass_thru[60b2cf22]
root_id = fcbb0445
parent_id = 9addbf63
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chain of Task-Group-Task aka Task-Chord
(T) ∞∞> |T| ∞∞> (T) -> (T) ∞∞> |T| ┬─> (T)
|T| |T| ┘
---------------------------------------------------------------------------
cr = chain(
_pass_thru.s(2),
group( mul.s(3), mul.s(4) ),
total.s()
).delay() -> <AsyncResult: 8fc69032>
cr.parent -> <GroupResult: bd6d0d73 [e9a3165f, 14786142]>
cr.parent.parent -> <AsyncResult: f0ef9429>
cr.parent.children[0].children -> []
---------------------------------------------------------------------------
Task _pass_thru[f0ef9429]
root_id = f0ef9429
chain = [
{
'task': 'celery.chord',
'kwargs': {
'header': {
'task': 'celery.group',
'kwargs': {
'tasks': [
{
'task': 'mul', 'args': [3], 'options': {
'task_id': 'e9a3165f',
'chord': { 'task': 'total', 'options': { 'task_id': '8fc69032' } },
'group_index': 0
}
},
{
'task': 'mul', 'args': [4], 'options': {
'task_id': '14786142',
'chord': { 'task': 'total', 'options': { 'task_id': '8fc69032' } },
'group_index': 1
}
}
]
},
'options': {
'task_id': 'bd6d0d73',
'chord': { 'task': 'total', 'options': { 'task_id': '8fc69032' } }
},
'subtask_type': 'group'
},
'body': { 'task': 'total', 'options': { 'task_id': '8fc69032' } }
},
'options': { 'task_id': 'bd6d0d73' },
'subtask_type': 'chord'
}
]
children = [] ...then... [ <GroupResult: 22253781 [e9a3165f, 14786142]> ]
---------------------------------------------------------------------------
Task mul[e9a3165f]
root_id = f0ef9429
parent_id = f0ef9429
group = 22253781
chord = { 'task': 'total', 'options': { 'task_id': '8fc69032', 'parent_id': 'f0ef9429', 'root_id': 'f0ef9429' } }
children = [] ...then... [ <AsyncResult: 8fc69032> ]
---------------------------------------------------------------------------
Task mul[14786142]
root_id = f0ef9429
parent_id = f0ef9429
group = 22253781
chord = { 'task': 'total', 'options': { 'task_id': '8fc69032', 'parent_id': 'f0ef9429', 'root_id': 'f0ef9429' } }
---------------------------------------------------------------------------
Task total[8fc69032]
root_id = f0ef9429
parent_id = f0ef9429
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Chain of Group-Task-Group
|T| ∞∞> (T) ∞∞> |T| -> |T| ┬─> (T) ∞∞> |T|
|T| |T| |T| ┘ |T|
---------------------------------------------------------------------------
cr = chain(
group( _pass_thru.s(1), _pass_thru.s(2) ),
total.s(),
group( mul.s(3), mul.s(4) )
).delay()
-> <GroupResult: 1e8a3e44 [322bc6c1, 791b1d56]>
cr.parent -> <AsyncResult: 5366afdc>
cr.parent.children -> [ <GroupResult: 1e8a3e44 [322bc6c1, 791b1d56]> ]
cr.parent.parent -> <GroupResult: 1abc73d7 [33e0989c, 4db7774a]>
---------------------------------------------------------------------------
Task _pass_thru[33e0989c]
root_id = 33e0989c
group = 1abc73d7
chord = {
'task': 'total',
'options': {
'task_id': '5366afdc',
'chain': [
{
'task': 'celery.group',
'kwargs': {
'tasks': [
{ 'task': 'mul', 'args': [3], 'options': { 'task_id': '322bc6c1', 'group_index': 0 } },
{ 'task': 'mul', 'args': [4], 'options': { 'task_id': '791b1d56', 'group_index': 1 } }
]
},
'options': { 'task_id': '1e8a3e44' },
'subtask_type': 'group'
}
]
}
}
children = [] ...then... [ <AsyncResult: 5366afdc> ]
---------------------------------------------------------------------------
Task _pass_thru[4db7774a]
root_id = 4db7774a
group = 1abc73d7
chord = {
'task': 'total',
'options': {
'task_id': '5366afdc',
'chain': [
{
'task': 'celery.group',
'kwargs': {
'tasks': [
{ 'task': 'mul', 'args': [3], 'options': { 'task_id': '322bc6c1', 'group_index': 0 } },
{ 'task': 'mul', 'args': [4], 'options': { 'task_id': '791b1d56', 'group_index': 1 } }
]
},
'options': { 'task_id': '1e8a3e44' },
'subtask_type': 'group'
}
]
}
}
---------------------------------------------------------------------------
Task total[5366afdc]
root_id = 33e0989c
parent_id = 33e0989c
chain = [
{
'task': 'celery.group',
'kwargs': {
'tasks': [
{ 'task': 'mul', 'args': [3], 'options': { 'task_id': '322bc6c1', 'group_index': 0 } },
{ 'task': 'mul', 'args': [4], 'options': { 'task_id': '791b1d56', 'group_index': 1 } }
]
},
'options': { 'task_id': '1e8a3e44' },
'subtask_type': 'group'
}
]
children = [] ...then... [ <GroupResult: 1e8a3e44 [322bc6c1, 791b1d56]> ]
---------------------------------------------------------------------------
Task mul[322bc6c1]
root_id = 33e0989c
parent_id = 5366afdc
group = 1e8a3e44
---------------------------------------------------------------------------
Task mul[791b1d56]
root_id = 33e0989c
parent_id = 5366afdc
group = 1e8a3e44
---------------------------------------------------------------------------
───────────────────────────────────────────────────────────────────────────
• Group w/ a Chain
|T |
|T ∞∞> T|
---------------------------------------------------------------------------
gr = group( _pass_thru.s(), chain( add.s(5), mul.s(5) ) ).delay(7)
-> <GroupResult: 5982fa5b [40cedd78, f0f8f9e5]>
gr.children[1].parent -> <AsyncResult: 9cc0ddfc>
---------------------------------------------------------------------------
Task _pass_thru[40cedd78]
root_id = 40cedd78
group = 5982fa5b
---------------------------------------------------------------------------
Task add[9cc0ddfc]
root_id = 9cc0ddfc
chain = [
{ 'task': 'mul', 'args': [5], 'options': { 'task_id': 'f0f8f9e5', 'group_id': '5982fa5b' } }
]
children = [] ...then... [ <AsyncResult: f0f8f9e5> ]
---------------------------------------------------------------------------
Task mul[f0f8f9e5]
root_id = 9cc0ddfc
parent_id = 9cc0ddfc
group = 5982fa5b
---------------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment