public int trySetChildrenBlockedToReadyOrShortCircuitPlannedOrCanceled(long taskId)
{
return handle.createStatement("update tasks" +
" set updated_at = now(), state = case" +
" when task_type = " + TaskType.GROUPING_ONLY + " then " + TaskStateCode.PLANNED_CODE +
" when " + bitAnd("state_flags", Integer.toString(TaskStateFlags.CANCEL_REQUESTED)) + " != 0 then " + TaskStateCode.CANCELED_CODE +
" else " + TaskStateCode.READY_CODE +
" end" +
" where state = " + TaskStateCode.BLOCKED_CODE +
" and parent_id = :parentId" +
" and exists (" +
"select * from tasks pt" +
" where pt.id = tasks.parent_id" +
" and pt.state in (" + Stream.of(
TaskStateCode.canRunChildrenStates()
).map(it -> Short.toString(it.get())).collect(Collectors.joining(", ")) + ")" +
" )" +
" and not exists (" +
"select * from tasks up" +
" join task_dependencies dep on up.id = dep.upstream_id" +
" where dep.downstream_id = tasks.id" +
" and up.state not in (" + Stream.of(
TaskStateCode.canRunDownstreamStates()
).map(it -> Short.toString(it.get())).collect(Collectors.joining(", ")) + ")" +
")")
.bind("parentId", taskId)
.execute();
}
LOG: execute <unnamed>:
update tasks set updated_at = now(),
state = case
when task_type = 1 then 5
when state_flags & 1 != 0 then 9
else 1
end
where state = 0 and parent_id = $1
and exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up
join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '69'
+task0:
+task0_1:
echo>: task0~~~
+task0_2:
sh>: sleep 2
+task1:
+task1_1:
echo>: task1_2!!!!
+task1_2:
echo>: task1_3!!!!
+task1_3:
sh>: sleep 2
+task2:
loop>: 3
_do:
echo>: task2====
public enum TaskStateCode
{
BLOCKED(0),
READY(1),
RETRY_WAITING(2),
GROUP_RETRY_WAITING(3),
RUNNING(4),
PLANNED(5),
GROUP_ERROR(6),
SUCCESS(7),
ERROR(8),
CANCELED(9);
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 5 | 8 | 2017-03-17 18:31:40.79876+09 | | | 0 |
85 | 5 | 84 | 0 | 4 | 8 | 2017-03-17 18:31:40.808256+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
87 | 5 | 83 | 1 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
88 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
89 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
90 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
91 | 5 | 83 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
(9 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
(5 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 5 | 8 | 2017-03-17 18:31:40.79876+09 | | | 0 |
85 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:42.775882+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 4 | 8 | 2017-03-17 18:31:42.782431+09 | | | 0 | 2017-03-17 18:31:42.782431+09
87 | 5 | 83 | 1 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
88 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
89 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
90 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
91 | 5 | 83 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
(9 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
(5 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 5 | 8 | 2017-03-17 18:31:40.79876+09 | | | 0 |
85 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:42.775882+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 4 | 8 | 2017-03-17 18:31:42.782431+09 | | | 0 | 2017-03-17 18:31:42.782431+09
87 | 5 | 83 | 1 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
88 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
89 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
90 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
91 | 5 | 83 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
(9 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
(5 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 7 | 8 | 2017-03-17 18:31:46.881902+09 | | | 0 |
85 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:42.775882+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:46.870707+09 | | | 0 | 2017-03-17 18:31:42.782431+09
87 | 5 | 83 | 1 | 5 | 8 | 2017-03-17 18:31:46.886343+09 | | | 0 |
88 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:47.984994+09 | | | 0 | 2017-03-17 18:31:46.890204+09
89 | 5 | 87 | 0 | 4 | 8 | 2017-03-17 18:31:47.989404+09 | | | 0 | 2017-03-17 18:31:47.989404+09
90 | 5 | 87 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
91 | 5 | 83 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
(9 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
(5 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 7 | 8 | 2017-03-17 18:31:46.881902+09 | | | 0 |
85 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:42.775882+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:46.870707+09 | | | 0 | 2017-03-17 18:31:42.782431+09
87 | 5 | 83 | 1 | 5 | 8 | 2017-03-17 18:31:46.886343+09 | | | 0 |
88 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:47.984994+09 | | | 0 | 2017-03-17 18:31:46.890204+09
89 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:48.902213+09 | | | 0 | 2017-03-17 18:31:47.989404+09
90 | 5 | 87 | 0 | 4 | 8 | 2017-03-17 18:31:48.906727+09 | | | 0 | 2017-03-17 18:31:48.906727+09
91 | 5 | 83 | 0 | 0 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
(9 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
(5 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+-------------------------------+----------+--------------+-------------+-------------------------------
83 | 5 | | 1 | 5 | 8 | 2017-03-17 18:31:40.755681+09 | | | 0 |
84 | 5 | 83 | 1 | 7 | 8 | 2017-03-17 18:31:46.881902+09 | | | 0 |
85 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:42.775882+09 | | | 0 | 2017-03-17 18:31:40.808256+09
86 | 5 | 84 | 0 | 7 | 8 | 2017-03-17 18:31:46.870707+09 | | | 0 | 2017-03-17 18:31:42.782431+09
87 | 5 | 83 | 1 | 7 | 8 | 2017-03-17 18:31:52.042436+09 | | | 0 |
88 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:47.984994+09 | | | 0 | 2017-03-17 18:31:46.890204+09
89 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:48.902213+09 | | | 0 | 2017-03-17 18:31:47.989404+09
90 | 5 | 87 | 0 | 7 | 8 | 2017-03-17 18:31:52.036576+09 | | | 0 | 2017-03-17 18:31:48.906727+09
91 | 5 | 83 | 0 | 5 | 8 | 2017-03-17 18:31:52.895031+09 | | | 0 | 2017-03-17 18:31:52.048747+09
92 | 5 | 91 | 1 | 5 | 0 | 2017-03-17 18:31:53.01756+09 | | | 0 |
93 | 5 | 92 | 0 | 4 | 0 | 2017-03-17 18:31:53.022274+09 | | | 0 | 2017-03-17 18:31:53.022274+09
94 | 5 | 92 | 0 | 0 | 0 | 2017-03-17 18:31:52.895031+09 | | | 0 |
95 | 5 | 92 | 0 | 0 | 0 | 2017-03-17 18:31:52.895031+09 | | | 0 |
(13 rows)
id | upstream_id | downstream_id
----+-------------+---------------
61 | 85 | 86
62 | 84 | 87
63 | 88 | 89
64 | 89 | 90
65 | 87 | 91
66 | 93 | 94
67 | 94 | 95
(7 rows)
digdag=# select * from tasks order by id; select * from task_dependencies order by id;
id | attempt_id | parent_id | task_type | state | state_flags | updated_at | retry_at | state_params | retry_count | started_at
----+------------+-----------+-----------+-------+-------------+------------+----------+--------------+-------------+------------
(0 rows)
id | upstream_id | downstream_id
----+-------------+---------------
(0 rows)
irb(main):010:0> pp xs.inject({}){|a, x| x = x.clone; id = x.delete('id'); %w|exportParams resetStoreParams storeParams config retryCount stateParams report resumingTaskId retryAt error|.each{|k| x.delete(k)}; a[id] = x; a}
{83=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:56Z",
"startedAt"=>nil,
"parentId"=>nil,
"fullName"=>"+mydig",
"taskType"=>1,
"state"=>"success",
"stateFlags"=>8},
84=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:46Z",
"startedAt"=>nil,
"parentId"=>83,
"fullName"=>"+mydig+task0",
"taskType"=>1,
"state"=>"success",
"stateFlags"=>8},
85=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:42Z",
"startedAt"=>"2017-03-17T09:31:40Z",
"parentId"=>84,
"fullName"=>"+mydig+task0+task0_1",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
86=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[85],
"updatedAt"=>"2017-03-17T09:31:46Z",
"startedAt"=>"2017-03-17T09:31:42Z",
"parentId"=>84,
"fullName"=>"+mydig+task0+task0_2",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
87=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[84],
"updatedAt"=>"2017-03-17T09:31:52Z",
"startedAt"=>nil,
"parentId"=>83,
"fullName"=>"+mydig+task1",
"taskType"=>1,
"state"=>"success",
"stateFlags"=>8},
88=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:47Z",
"startedAt"=>"2017-03-17T09:31:46Z",
"parentId"=>87,
"fullName"=>"+mydig+task1+task1_1",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
89=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[88],
"updatedAt"=>"2017-03-17T09:31:48Z",
"startedAt"=>"2017-03-17T09:31:47Z",
"parentId"=>87,
"fullName"=>"+mydig+task1+task1_2",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
90=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[89],
"updatedAt"=>"2017-03-17T09:31:52Z",
"startedAt"=>"2017-03-17T09:31:48Z",
"parentId"=>87,
"fullName"=>"+mydig+task1+task1_3",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
91=>
{"subtaskConfig"=>
{"+loop-0"=>{"echo>"=>"task2====", "_export"=>{"i"=>0}},
"+loop-1"=>{"echo>"=>"task2====", "_export"=>{"i"=>1}},
"+loop-2"=>{"echo>"=>"task2====", "_export"=>{"i"=>2}}},
"attemptId"=>5,
"upstreams"=>[87],
"updatedAt"=>"2017-03-17T09:31:56Z",
"startedAt"=>"2017-03-17T09:31:52Z",
"parentId"=>83,
"fullName"=>"+mydig+task2",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>8},
92=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:56Z",
"startedAt"=>nil,
"parentId"=>91,
"fullName"=>"+mydig+task2^sub",
"taskType"=>1,
"state"=>"success",
"stateFlags"=>0},
93=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[],
"updatedAt"=>"2017-03-17T09:31:54Z",
"startedAt"=>"2017-03-17T09:31:53Z",
"parentId"=>92,
"fullName"=>"+mydig+task2^sub+loop-0",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>0},
94=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[93],
"updatedAt"=>"2017-03-17T09:31:55Z",
"startedAt"=>"2017-03-17T09:31:54Z",
"parentId"=>92,
"fullName"=>"+mydig+task2^sub+loop-1",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>0},
95=>
{"subtaskConfig"=>{},
"attemptId"=>5,
"upstreams"=>[94],
"updatedAt"=>"2017-03-17T09:31:56Z",
"startedAt"=>"2017-03-17T09:31:55Z",
"parentId"=>92,
"fullName"=>"+mydig+task2^sub+loop-2",
"taskType"=>0,
"state"=>"success",
"stateFlags"=>0}}
# runWhile -> propagateBlockedChildrenToReady
LOG: execute S_10:
update tasks set updated_at = now(),
state = case when task_type = 1 then 5 when state_flags & 1 != 0 then 9 else 1 end
where state = 0 and parent_id = $1 and
exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '84'
# runWithHeartbeat -> taskSucceeded -> setRunningToShortCircuitSuccess
LOG: execute <unnamed>: update tasks set updated_at = now(), state = $1 where id = $2 and state = $3
DETAIL: parameters: $1 = '7', $2 = '89', $3 = '4'
# propagateBlockedChildrenToReady
LOG: execute S_19:
update tasks set updated_at = now(),
state = case when task_type = 1 then 5 when state_flags & 1 != 0 then 9 else 1 end
where state = 0 and parent_id = $1 and
exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '87'
# runWhile -> enqueueReadyTasks -> setReadyToRunning
LOG: execute <unnamed>: update tasks set started_at = coalesce(started_at, now()), updated_at = now(), state = $1
where id = $2 and state = $3
DETAIL: parameters: $1 = '4', $2 = '90', $3 = '1'
# propagateAllPlannedToDone -> setDoneFromDoneChildren -> setPlannedToSuccess
LOG: execute <unnamed>: update tasks set updated_at = now(), state = $1, state_params = NULL
where id = $2 and state = $3
DETAIL: parameters: $1 = '7', $2 = '87', $3 = '5'
# runWhile -> propagateBlockedChildrenToReady
LOG: execute S_19:
update tasks set updated_at = now(),
state = case when task_type = 1 then 5 when state_flags & 1 != 0 then 9 else 1 end
where state = 0 and parent_id = $1 and
exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '83'
# runWhile -> enqueueReadyTasks -> setReadyToRunning
LOG: execute <unnamed>: update tasks set started_at = coalesce(started_at, now()), updated_at = now(), state = $1
where id = $2 and state = $3
DETAIL: parameters: $1 = '4', $2 = '91', $3 = '1'
# runWithHeartbeat -> taskSucceeded -> setRunningToPlannedSuccessful
LOG: execute S_15: update tasks set updated_at = now(), state = $1 where id = $2 and state = $3
DETAIL: parameters: $1 = '5', $2 = '91', $3 = '4'
# runWhile -> propagateBlockedChildrenToReady
LOG: execute S_19:
update tasks set updated_at = now(),
state = case when task_type = 1 then 5 when state_flags & 1 != 0 then 9 else 1 end
where state = 0 and parent_id = $1 and
exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '91'
LOG: execute S_22: update task_state_details set subtask_config = $1, export_params = $2,
store_params = $3, report = $4, error = null, reset_store_params = $5 where id = $6
DETAIL: parameters: $1 = NULL, $2 = NULL, $3 = NULL, $4 = NULL, $5 = NULL, $6 = '94'
# runWhile -> propagateBlockedChildrenToReady
LOG: execute S_19:
update tasks set updated_at = now(),
state = case when task_type = 1 then 5 when state_flags & 1 != 0 then 9 else 1 end
where state = 0 and parent_id = $1 and
exists (select * from tasks pt where pt.id = tasks.parent_id and pt.state in (5, 7) )
and not exists (
select * from tasks up join task_dependencies dep on up.id = dep.upstream_id
where dep.downstream_id = tasks.id and up.state not in (7)
)
DETAIL: parameters: $1 = '92'
# runWhile -> enqueueReadyTasks -> setReadyToRunning
LOG: execute S_24: update tasks set started_at = coalesce(started_at, now()), updated_at = now(), state = $1
where id = $2 and state = $3
DETAIL: parameters: $1 = '4', $2 = '95', $3 = '1'
LOG: execute S_22: update task_state_details set subtask_config = $1, export_params = $2, store_params = $3,
report = $4, error = null, reset_store_params = $5 where id = $6
DETAIL: parameters: $1 = NULL, $2 = NULL, $3 = NULL, $4 = NULL, $5 = NULL, $6 = '95'
# propagateAllPlannedToDone -> setDoneFromDoneChildren -> setPlannedToSuccess
LOG: execute <unnamed>: update tasks set updated_at = now(), state = $1, state_params = NULL
where id = $2 and state = $3
DETAIL: parameters: $1 = '7', $2 = '92', $3 = '5'
# propagateAllPlannedToDone -> setDoneFromDoneChildren -> setPlannedToSuccess
LOG: execute <unnamed>: update tasks set updated_at = now(), state = $1, state_params = NULL
where id = $2 and state = $3
DETAIL: parameters: $1 = '7', $2 = '91', $3 = '5'
# propagateAllPlannedToDone -> setDoneFromDoneChildren -> setPlannedToSuccess
LOG: execute S_26: update tasks set updated_at = now(), state = $1, state_params = NULL
where id = $2 and state = $3
DETAIL: parameters: $1 = '7', $2 = '83', $3 = '5'
# runWhile -> trySetRetryWaitingToReady
LOG: execute S_22: update tasks set updated_at = now(), retry_at = NULL, state = 1
where state in (2,3) and retry_at <= now()