Skip to content

Instantly share code, notes, and snippets.

/retry.diff Secret

Created October 29, 2015 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/c5ade3275a15f1aad34f to your computer and use it in GitHub Desktop.
Save anonymous/c5ade3275a15f1aad34f to your computer and use it in GitHub Desktop.
diff --git a/Changes b/Changes
index 5a411ff..9747758 100644
--- a/Changes
+++ b/Changes
@@ -1,7 +1,12 @@
-3.0 2015-10-29
+3.0 2015-10-30
- Removed Minion::Backend::File, because DBM::Deep quickly becomes unusably
slow, we recommend the use of Minion::Backend::SQLite instead.
+ - Added support for automatically retried jobs.
+ - Added attempts attribute to Minion::Job.
+ - Added retry_delay attribute to Minion.
+ - Added attempts option to enqueue method in Minion and Minion::Backend::Pg.
+ - Added -A option to job command.
2.05 2015-10-15
- Fixed bug where jobs could sometimes not be finished correctly by the worker
diff --git a/lib/Minion.pm b/lib/Minion.pm
index b262c4d..471b806 100644
--- a/lib/Minion.pm
+++ b/lib/Minion.pm
@@ -13,6 +13,7 @@ has app => sub { Mojo::Server->new->build_app('Mojo::HelloWorld') };
has 'backend';
has missing_after => 86400;
has remove_after => 864000;
+has retry_delay => sub { \&_retry };
has tasks => sub { {} };
our $VERSION = '3.0';
@@ -26,11 +27,12 @@ sub job {
return undef unless my $job = $self->backend->job_info($id);
return Minion::Job->new(
- args => $job->{args},
- id => $job->{id},
- minion => $self,
- retries => $job->{retries},
- task => $job->{task}
+ args => $job->{args},
+ attempts => $job->{attempts},
+ id => $job->{id},
+ minion => $self,
+ retries => $job->{retries},
+ task => $job->{task}
);
}
@@ -72,6 +74,8 @@ sub _delegate {
return $self;
}
+sub _retry { ($_[0]**4) + 15 + $_[0] }
+
1;
=encoding utf8
@@ -241,6 +245,19 @@ Amount of time in seconds after which jobs that have reached the state
C<finished> will be removed automatically by L</"repair">, defaults to
C<864000> (10 days).
+=head2 retry_delay
+
+ my $cb = $minion->retry_delay;
+ $minion = $minion->retry_delay(sub {...});
+
+A callback used to calculate the delay for automatically retried jobs, defaults
+to C<(retries ** 4) + 15 + retries>.
+
+ $minion->retry_delay(sub {
+ my $retries = shift;
+ return ($retries ** 4) + 15;
+ });
+
=head2 tasks
my $tasks = $minion->tasks;
@@ -282,6 +299,12 @@ These options are currently available:
=over 2
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item delay
delay => 10
diff --git a/lib/Minion/Backend.pm b/lib/Minion/Backend.pm
index 475417e..127a14e 100644
--- a/lib/Minion/Backend.pm
+++ b/lib/Minion/Backend.pm
@@ -109,6 +109,12 @@ These fields are currently available:
Job arguments.
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item id
id => '10023'
@@ -141,6 +147,12 @@ These options are currently available:
=over 2
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item delay
delay => 10
@@ -204,6 +216,12 @@ These fields are currently available:
Job arguments.
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item created
created => 784111777
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm
index 2f651db..cd9f39a 100644
--- a/lib/Minion/Backend/Pg.pm
+++ b/lib/Minion/Backend/Pg.pm
@@ -30,10 +30,11 @@ sub enqueue {
my $db = $self->pg->db;
return $db->query(
- "insert into minion_jobs (args, delayed, priority, queue, task)
- values (?, (now() + (interval '1 second' * ?)), ?, ?, ?)
- returning id", {json => $args}, $options->{delay} // 0,
- $options->{priority} // 0, $options->{queue} // 'default', $task
+ "insert into minion_jobs (args, attempts, delayed, priority, queue, task)
+ values (?, ?, (now() + (interval '1 second' * ?)), ?, ?, ?)
+ returning id", {json => $args}, $options->{attempts} // 1,
+ $options->{delay} // 0, $options->{priority} // 0,
+ $options->{queue} // 'default', $task
)->hash->{id};
}
@@ -42,7 +43,7 @@ sub finish_job { shift->_update(0, @_) }
sub job_info {
shift->pg->db->query(
- 'select id, args, extract(epoch from created) as created,
+ 'select id, args, attempts, extract(epoch from created) as created,
extract(epoch from delayed) as delayed,
extract(epoch from finished) as finished, priority, queue, result,
extract(epoch from retried) as retried, retries,
@@ -196,7 +197,7 @@ sub _try {
limit 1
for update
)
- returning id, args, retries, task", $id,
+ returning id, args, attempts, retries, task", $id,
$options->{queues} || ['default'], [keys %{$self->minion->tasks}]
)->expand->hash;
}
@@ -281,6 +282,12 @@ These fields are currently available:
Job arguments.
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item id
id => '10023'
@@ -313,6 +320,12 @@ These options are currently available:
=over 2
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item delay
delay => 10
@@ -373,6 +386,12 @@ These fields are currently available:
Job arguments.
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item created
created => 784111777
@@ -659,3 +678,6 @@ create index on minion_jobs (state);
-- 4 up
alter table minion_jobs add column queue text not null default 'default';
+
+-- 5 up
+alter table minion_jobs add column attempts int not null default 1;
diff --git a/lib/Minion/Command/minion/job.pm b/lib/Minion/Command/minion/job.pm
index 9390332..8ed9b04 100644
--- a/lib/Minion/Command/minion/job.pm
+++ b/lib/Minion/Command/minion/job.pm
@@ -14,11 +14,12 @@ sub run {
my ($args, $options) = ([], {});
GetOptionsFromArray \@args,
- 'a|args=s' => sub { $args = decode_json($_[1]) },
+ 'A|attempts=i' => \$options->{attempts},
+ 'a|args=s' => sub { $args = decode_json($_[1]) },
'd|delay=i' => \$options->{delay},
'e|enqueue=s' => \my $enqueue,
- 'l|limit=i' => \(my $limit = 100),
- 'o|offset=i' => \(my $offset = 0),
+ 'l|limit=i' => \(my $limit = 100),
+ 'o|offset=i' => \(my $offset = 0),
'p|priority=i' => \$options->{priority},
'q|queue=s' => \$options->{queue},
'R|retry' => \my $retry,
@@ -119,6 +120,7 @@ Minion::Command::minion::job - Minion job command
./myapp.pl minion job -r 10023
Options:
+ -A, --attempts <number> Number of times job will be retried automatically
-a, --args <JSON array> Arguments for new job in JSON format
-d, --delay <seconds> Delay new job for this many seconds
-e, --enqueue <name> New job to be enqueued
diff --git a/lib/Minion/Job.pm b/lib/Minion/Job.pm
index b0215ea..42f8e17 100644
--- a/lib/Minion/Job.pm
+++ b/lib/Minion/Job.pm
@@ -4,15 +4,22 @@ use Mojo::Base 'Mojo::EventEmitter';
use Mojo::IOLoop;
use POSIX 'WNOHANG';
-has [qw(args id minion retries task)];
+has [qw(args attempts id minion retries task)];
sub app { shift->minion->app }
sub fail {
my $self = shift;
- my $err = shift // 'Unknown error';
- my $ok = $self->minion->backend->fail_job($self->id, $self->retries, $err);
- return $ok ? !!$self->emit(failed => $err) : undef;
+ my $err = shift // 'Unknown error';
+
+ return undef
+ unless $self->minion->backend->fail_job($self->id, $self->retries, $err);
+
+ my $retries = $self->emit(failed => $err)->retries;
+ my $attempts = $self->attempts;
+ return $self->retry({delay => $self->minion->retry_delay->($retries)})
+ if $attempts > 1 && $attempts > ($retries + 1);
+ return 1;
}
sub finish {
@@ -144,6 +151,13 @@ L<Minion::Job> implements the following attributes.
Arguments passed to task.
+=head2 attempts
+
+ my $attempts = $job->attempts;
+ $job = $job->attempts(25);
+
+Number of times job will be retried automatically.
+
=head2 id
my $id = $job->id;
@@ -224,6 +238,12 @@ These fields are currently available:
Job arguments.
+=item attempts
+
+ attempts => 25
+
+Number of times job will be retried automatically.
+
=item created
created => 784111777
diff --git a/lib/Minion/Worker.pm b/lib/Minion/Worker.pm
index 5d3af5c..a3e4e6b 100644
--- a/lib/Minion/Worker.pm
+++ b/lib/Minion/Worker.pm
@@ -12,11 +12,12 @@ sub dequeue {
my $minion = $self->minion;
return undef unless my $job = $minion->backend->dequeue($id, $wait, $options);
$job = Minion::Job->new(
- args => $job->{args},
- id => $job->{id},
- minion => $minion,
- retries => $job->{retries},
- task => $job->{task}
+ args => $job->{args},
+ attempts => $job->{attempts},
+ id => $job->{id},
+ minion => $minion,
+ retries => $job->{retries},
+ task => $job->{task}
);
$self->emit(dequeue => $job);
return $job;
diff --git a/t/pg.t b/t/pg.t
index 973d3fc..af7d0d8 100644
--- a/t/pg.t
+++ b/t/pg.t
@@ -460,6 +460,33 @@ is $job->info->{state}, 'failed', 'right state';
is $job->info->{result}, 'Non-zero exit status (1)', 'right result';
$worker->unregister;
+# Auto retry
+is $minion->retry_delay->(0), 15, 'right result';
+is $minion->retry_delay->(1), 17, 'right result';
+is $minion->retry_delay->(2), 33, 'right result';
+is $minion->retry_delay->(3), 99, 'right result';
+is $minion->retry_delay->(4), 275, 'right result';
+is $minion->retry_delay->(5), 645, 'right result';
+is $minion->retry_delay->(25), 390665, 'right result';
+$id = $minion->enqueue(exit => [] => {attempts => 2});
+$job = $worker->register->dequeue(0);
+is $job->id, $id, 'right id';
+is $job->retries, 0, 'job has not been retried';
+is $job->attempts, 2, 'job will be attempted twice';
+$job->perform;
+is $job->info->{state}, 'inactive', 'right state';
+ok $job->info->{retried} < $job->info->{delayed}, 'delayed timestamp';
+$minion->backend->pg->db->query(
+ 'update minion_jobs set delayed = now() where id = ?', $id);
+$job = $worker->register->dequeue(0);
+is $job->id, $id, 'right id';
+is $job->retries, 1, 'job has been retried once';
+is $job->attempts, 2, 'job will be attempted twice';
+$job->perform;
+is $job->info->{state}, 'failed', 'right state';
+is $job->info->{result}, 'Non-zero exit status (1)', 'right result';
+$worker->unregister;
+
# A job needs to be dequeued again after a retry
$minion->add_task(restart => sub { });
$id = $minion->enqueue('restart');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment