Skip to content

Instantly share code, notes, and snippets.

Created May 16, 2016 21:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/0f8924c1d63666d101a64302e797fd35 to your computer and use it in GitHub Desktop.
Save anonymous/0f8924c1d63666d101a64302e797fd35 to your computer and use it in GitHub Desktop.
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm
index c6e0ad2..b298c51 100644
--- a/lib/Minion/Backend/Pg.pm
+++ b/lib/Minion/Backend/Pg.pm
@@ -29,11 +29,12 @@ sub enqueue {
my $db = $self->pg->db;
return $db->query(
- "insert into minion_jobs (args, attempts, delayed, priority, queue, task)
- values (?, ?, (now() + (interval '1 second' * ?)), ?, ?, ?)
+ "insert into minion_jobs
+ (args, attempts, delayed, parents, priority, queue, task)
+ values (?, ?, (now() + (interval '1 second' * ?)), ?, ?, ?, ?)
returning id", {json => $args}, $options->{attempts} // 1,
- $options->{delay} // 0, $options->{priority} // 0,
- $options->{queue} // 'default', $task
+ $options->{delay} // 0, $options->{parents} || [],
+ $options->{priority} // 0, $options->{queue} // 'default', $task
)->hash->{id};
}
@@ -42,12 +43,14 @@ sub finish_job { shift->_update(0, @_) }
sub job_info {
shift->pg->db->query(
- 'select id, args, attempts, extract(epoch from created) as created,
+ 'select id, args, attempts,
+ array(select id from minion_jobs where j.id = any(parents)) as children,
+ extract(epoch from created) as created,
extract(epoch from delayed) as delayed,
- extract(epoch from finished) as finished, priority, queue, result,
- extract(epoch from retried) as retried, retries,
+ extract(epoch from finished) as finished, parents, priority, queue,
+ result, extract(epoch from retried) as retried, retries,
extract(epoch from started) as started, state, task, worker
- from minion_jobs where id = ?', shift
+ from minion_jobs as j where id = ?', shift
)->expand->hash;
}
@@ -120,6 +123,17 @@ sub repair {
)->hashes;
$fail->each(sub { $self->fail_job(@$_{qw(id retries)}, 'Worker went away') });
+ # Jobs with missing parents
+ $db->query(
+ "update minion_jobs as j
+ set finished = now(), result = to_json('Parent went away'::text),
+ state = 'failed'
+ where state = 'inactive' and parents != '{}'
+ and array_length(parents, 1) != (
+ select count(*) from minion_jobs where id = any(j.parents)
+ )"
+ );
+
# Old jobs
$db->query(
"delete from minion_jobs
@@ -152,7 +166,7 @@ sub stats {
"select state::text || '_jobs', count(*) from minion_jobs group by state
union all
select 'delayed_jobs', count(*) from minion_jobs
- where state = 'inactive' and delayed > now()
+ where state = 'inactive' and (delayed > now() or parents != '{}')
union all
select 'inactive_workers', count(*) from minion_workers
union all
@@ -187,9 +201,12 @@ sub _try {
"update minion_jobs
set started = now(), state = 'active', worker = ?
where id = (
- select id from minion_jobs
+ select id from minion_jobs as j
where delayed <= now() and queue = any (?) and state = 'inactive'
- and task = any (?)
+ and task = any (?) and (parents = '{}' or array_length(parents, 1) = (
+ select count(*) from minion_jobs as p
+ where p.id = any(j.parents) and p.state = 'finished'
+ ))
order by priority desc, created
limit 1
for update skip locked
@@ -759,3 +776,6 @@ create trigger minion_jobs_notify_workers_trigger
-- 9 down
drop trigger if exists minion_jobs_notify_workers_trigger on minion_jobs;
drop function if exists minion_jobs_notify_workers();
+
+-- 10 up
+alter table minion_jobs add column parents bigint[] default '{}'::bigint[];
diff --git a/t/pg.t b/t/pg.t
index 1a1e7ef..65945b2 100644
--- a/t/pg.t
+++ b/t/pg.t
@@ -24,10 +24,11 @@ my $worker = $minion->repair->worker;
isa_ok $worker->minion->app, 'Mojolicious', 'has default application';
# Migrate up and down
-is $minion->backend->pg->migrations->active, 9, 'active version is 9';
+is $minion->backend->pg->migrations->active, 10, 'active version is 10';
is $minion->backend->pg->migrations->migrate(0)->active, 0,
'active version is 0';
-is $minion->backend->pg->migrations->migrate->active, 9, 'active version is 9';
+is $minion->backend->pg->migrations->migrate->active, 10,
+ 'active version is 10';
# Register and unregister
$worker->register;
@@ -194,8 +195,10 @@ like $batch->[0]{created}, qr/^[\d.]+$/, 'has created timestamp';
is $batch->[1]{task}, 'fail', 'right task';
is_deeply $batch->[1]{args}, [], 'right arguments';
is_deeply $batch->[1]{result}, ['works'], 'right result';
-is $batch->[1]{state}, 'finished', 'right state';
-is $batch->[1]{priority}, 0, 'right priority';
+is $batch->[1]{state}, 'finished', 'right state';
+is $batch->[1]{priority}, 0, 'right priority';
+is_deeply $batch->[1]{parents}, [], 'right parents';
+is_deeply $batch->[1]{children}, [], 'right children';
is $batch->[1]{retries}, 1, 'job has been retried';
like $batch->[1]{created}, qr/^[\d.]+$/, 'has created timestamp';
like $batch->[1]{delayed}, qr/^[\d.]+$/, 'has delayed timestamp';
@@ -593,6 +596,37 @@ is $minion->job($id4)->info->{result}, 'Non-zero exit status (1)',
'right result';
$worker->unregister;
+# Job dependencies
+$worker = $minion->worker->register;
+$id = $minion->enqueue('test');
+$id2 = $minion->enqueue('test');
+$id3 = $minion->enqueue(test => [] => {parents => [$id, $id2]});
+is $minion->stats->{delayed_jobs}, 1, 'one delayed job';
+$job = $worker->dequeue(0);
+is $job->id, $id, 'right id';
+is_deeply $job->info->{children}, [$id3], 'right children';
+is_deeply $job->info->{parents}, [], 'right parents';
+$job2 = $worker->dequeue(0);
+is $job2->id, $id2, 'right id';
+is_deeply $job2->info->{children}, [$id3], 'right children';
+is_deeply $job2->info->{parents}, [], 'right parents';
+ok !$worker->dequeue(0), 'parents are not ready yet';
+$job->finish;
+ok !$worker->dequeue(0), 'parents are not ready yet';
+$job2->finish;
+$job = $worker->dequeue(0);
+is $job->id, $id3, 'right id';
+is_deeply $job->info->{children}, [], 'right children';
+is_deeply $job->info->{parents}, [$id, $id2], 'right parents';
+$id = $minion->enqueue(test => [] => {parents => [-1]});
+ok !$worker->dequeue(0), 'job with missing parent will never be ready';
+$minion->repair;
+like $minion->job($id)->info->{finished}, qr/^[\d.]+$/,
+ 'has finished timestamp';
+is $minion->job($id)->info->{state}, 'failed', 'job is no longer active';
+is $minion->job($id)->info->{result}, 'Parent went away', 'right result';
+$worker->unregister;
+
# Clean up once we are done
$pg->db->query('drop schema minion_test cascade');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment