-
-
Save anonymous/0f8924c1d63666d101a64302e797fd35 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm | |
index c6e0ad2..b298c51 100644 | |
--- a/lib/Minion/Backend/Pg.pm | |
+++ b/lib/Minion/Backend/Pg.pm | |
@@ -29,11 +29,12 @@ sub enqueue { | |
my $db = $self->pg->db; | |
return $db->query( | |
- "insert into minion_jobs (args, attempts, delayed, priority, queue, task) | |
- values (?, ?, (now() + (interval '1 second' * ?)), ?, ?, ?) | |
+ "insert into minion_jobs | |
+ (args, attempts, delayed, parents, priority, queue, task) | |
+ values (?, ?, (now() + (interval '1 second' * ?)), ?, ?, ?, ?) | |
returning id", {json => $args}, $options->{attempts} // 1, | |
- $options->{delay} // 0, $options->{priority} // 0, | |
- $options->{queue} // 'default', $task | |
+ $options->{delay} // 0, $options->{parents} || [], | |
+ $options->{priority} // 0, $options->{queue} // 'default', $task | |
)->hash->{id}; | |
} | |
@@ -42,12 +43,14 @@ sub finish_job { shift->_update(0, @_) } | |
sub job_info { | |
shift->pg->db->query( | |
- 'select id, args, attempts, extract(epoch from created) as created, | |
+ 'select id, args, attempts, | |
+ array(select id from minion_jobs where j.id = any(parents)) as children, | |
+ extract(epoch from created) as created, | |
extract(epoch from delayed) as delayed, | |
- extract(epoch from finished) as finished, priority, queue, result, | |
- extract(epoch from retried) as retried, retries, | |
+ extract(epoch from finished) as finished, parents, priority, queue, | |
+ result, extract(epoch from retried) as retried, retries, | |
extract(epoch from started) as started, state, task, worker | |
- from minion_jobs where id = ?', shift | |
+ from minion_jobs as j where id = ?', shift | |
)->expand->hash; | |
} | |
@@ -120,6 +123,17 @@ sub repair { | |
)->hashes; | |
$fail->each(sub { $self->fail_job(@$_{qw(id retries)}, 'Worker went away') }); | |
+ # Jobs with missing parents | |
+ $db->query( | |
+ "update minion_jobs as j | |
+ set finished = now(), result = to_json('Parent went away'::text), | |
+ state = 'failed' | |
+ where state = 'inactive' and parents != '{}' | |
+ and array_length(parents, 1) != ( | |
+ select count(*) from minion_jobs where id = any(j.parents) | |
+ )" | |
+ ); | |
+ | |
# Old jobs | |
$db->query( | |
"delete from minion_jobs | |
@@ -152,7 +166,7 @@ sub stats { | |
"select state::text || '_jobs', count(*) from minion_jobs group by state | |
union all | |
select 'delayed_jobs', count(*) from minion_jobs | |
- where state = 'inactive' and delayed > now() | |
+ where state = 'inactive' and (delayed > now() or parents != '{}') | |
union all | |
select 'inactive_workers', count(*) from minion_workers | |
union all | |
@@ -187,9 +201,12 @@ sub _try { | |
"update minion_jobs | |
set started = now(), state = 'active', worker = ? | |
where id = ( | |
- select id from minion_jobs | |
+ select id from minion_jobs as j | |
where delayed <= now() and queue = any (?) and state = 'inactive' | |
- and task = any (?) | |
+ and task = any (?) and (parents = '{}' or array_length(parents, 1) = ( | |
+ select count(*) from minion_jobs as p | |
+ where p.id = any(j.parents) and p.state = 'finished' | |
+ )) | |
order by priority desc, created | |
limit 1 | |
for update skip locked | |
@@ -759,3 +776,6 @@ create trigger minion_jobs_notify_workers_trigger | |
-- 9 down | |
drop trigger if exists minion_jobs_notify_workers_trigger on minion_jobs; | |
drop function if exists minion_jobs_notify_workers(); | |
+ | |
+-- 10 up | |
+alter table minion_jobs add column parents bigint[] default '{}'::bigint[]; | |
diff --git a/t/pg.t b/t/pg.t | |
index 1a1e7ef..65945b2 100644 | |
--- a/t/pg.t | |
+++ b/t/pg.t | |
@@ -24,10 +24,11 @@ my $worker = $minion->repair->worker; | |
isa_ok $worker->minion->app, 'Mojolicious', 'has default application'; | |
# Migrate up and down | |
-is $minion->backend->pg->migrations->active, 9, 'active version is 9'; | |
+is $minion->backend->pg->migrations->active, 10, 'active version is 10'; | |
is $minion->backend->pg->migrations->migrate(0)->active, 0, | |
'active version is 0'; | |
-is $minion->backend->pg->migrations->migrate->active, 9, 'active version is 9'; | |
+is $minion->backend->pg->migrations->migrate->active, 10, | |
+ 'active version is 10'; | |
# Register and unregister | |
$worker->register; | |
@@ -194,8 +195,10 @@ like $batch->[0]{created}, qr/^[\d.]+$/, 'has created timestamp'; | |
is $batch->[1]{task}, 'fail', 'right task'; | |
is_deeply $batch->[1]{args}, [], 'right arguments'; | |
is_deeply $batch->[1]{result}, ['works'], 'right result'; | |
-is $batch->[1]{state}, 'finished', 'right state'; | |
-is $batch->[1]{priority}, 0, 'right priority'; | |
+is $batch->[1]{state}, 'finished', 'right state'; | |
+is $batch->[1]{priority}, 0, 'right priority'; | |
+is_deeply $batch->[1]{parents}, [], 'right parents'; | |
+is_deeply $batch->[1]{children}, [], 'right children'; | |
is $batch->[1]{retries}, 1, 'job has been retried'; | |
like $batch->[1]{created}, qr/^[\d.]+$/, 'has created timestamp'; | |
like $batch->[1]{delayed}, qr/^[\d.]+$/, 'has delayed timestamp'; | |
@@ -593,6 +596,37 @@ is $minion->job($id4)->info->{result}, 'Non-zero exit status (1)', | |
'right result'; | |
$worker->unregister; | |
+# Job dependencies | |
+$worker = $minion->worker->register; | |
+$id = $minion->enqueue('test'); | |
+$id2 = $minion->enqueue('test'); | |
+$id3 = $minion->enqueue(test => [] => {parents => [$id, $id2]}); | |
+is $minion->stats->{delayed_jobs}, 1, 'one delayed job'; | |
+$job = $worker->dequeue(0); | |
+is $job->id, $id, 'right id'; | |
+is_deeply $job->info->{children}, [$id3], 'right children'; | |
+is_deeply $job->info->{parents}, [], 'right parents'; | |
+$job2 = $worker->dequeue(0); | |
+is $job2->id, $id2, 'right id'; | |
+is_deeply $job2->info->{children}, [$id3], 'right children'; | |
+is_deeply $job2->info->{parents}, [], 'right parents'; | |
+ok !$worker->dequeue(0), 'parents are not ready yet'; | |
+$job->finish; | |
+ok !$worker->dequeue(0), 'parents are not ready yet'; | |
+$job2->finish; | |
+$job = $worker->dequeue(0); | |
+is $job->id, $id3, 'right id'; | |
+is_deeply $job->info->{children}, [], 'right children'; | |
+is_deeply $job->info->{parents}, [$id, $id2], 'right parents'; | |
+$id = $minion->enqueue(test => [] => {parents => [-1]}); | |
+ok !$worker->dequeue(0), 'job with missing parent will never be ready'; | |
+$minion->repair; | |
+like $minion->job($id)->info->{finished}, qr/^[\d.]+$/, | |
+ 'has finished timestamp'; | |
+is $minion->job($id)->info->{state}, 'failed', 'job is no longer active'; | |
+is $minion->job($id)->info->{result}, 'Parent went away', 'right result'; | |
+$worker->unregister; | |
+ | |
# Clean up once we are done | |
$pg->db->query('drop schema minion_test cascade'); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment