Skip to content

Instantly share code, notes, and snippets.

Created June 13, 2014 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/8899b1f7e0b507664514 to your computer and use it in GitHub Desktop.
Save anonymous/8899b1f7e0b507664514 to your computer and use it in GitHub Desktop.
package Minion::Backend::Storable;
use Mojo::Base 'Minion::Backend';
use List::Util 'first';
use Mojo::IOLoop;
use Sys::Hostname 'hostname';
use Time::HiRes 'time';
has 'file';
sub dequeue {
my ($self, $id) = @_;
my $guard = $self->_guard;
my @inactive = grep { $_->{state} eq 'inactive' } @{$guard->_jobs};
my $now = time;
my @ready = sort { $b->{priority} <=> $a->{priority} }
grep { $_->{delayed} < $now } @inactive;
return undef
unless my $job = first { $self->minion->tasks->{$_->{task}} } @ready;
$guard->_write;
@$job{qw(started state worker)} = (time, 'active', $id);
return $job;
}
sub enqueue {
my ($self, $task) = (shift, shift);
my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
my $args = shift // [];
my $options = shift // {};
my $guard = $self->_guard->_write;
my $job = {
args => $args,
created => time,
delayed => $options->{delayed} ? $options->{delayed} : 1,
id => $guard->_id,
priority => $options->{priority} // 0,
restarts => 0,
state => 'inactive',
task => $task
};
# Blocking
push @{$guard->_jobs}, $job;
return $job->{id} unless $cb;
# Non-blocking
Mojo::IOLoop->next_tick(sub { $self->$cb(undef, $job->{id}) });
}
sub fail_job { shift->_update(1, @_) }
sub finish_job { shift->_update(0, @_) }
sub job_info { shift->_guard->_job(shift) }
sub list_jobs {
my ($self, $skip, $limit) = @_;
my $guard = $self->_guard;
my @jobs = sort { $b->{created} cmp $a->{created} } @{$guard->_jobs};
return [@jobs[$skip .. ($skip + $limit - 1)]];
}
sub list_workers {
my ($self, $skip, $limit) = @_;
my $guard = $self->_guard;
my @workers = map { $self->_worker_info($guard, $_->{id}) }
sort { $b->{started} cmp $a->{started} } @{$guard->_workers};
return [@workers[$skip .. ($skip + $limit - 1)]];
}
sub new { shift->SUPER::new(file => shift) }
sub register_worker {
my $guard = shift->_guard->_write;
my $id = $guard->_id;
push @{$guard->_workers},
{host => hostname, id => $id, pid => $$, started => time};
return $id;
}
sub remove_job {
my ($self, $id) = @_;
my $guard = $self->_guard->_write;
return undef unless my $job = $guard->_job($id);
return undef
unless grep { $job->{state} eq $_ } qw(failed finished inactive);
@{$guard->_jobs} = grep { $_->{id} ne $id } @{$guard->_jobs};
return 1;
}
sub repair {
my $self = shift;
# Check workers on this host (all should be owned by the same user)
my $guard = $self->_guard->_write;
@{$guard->_workers}
= grep { $_->{host} ne hostname || kill 0, $_->{pid} } @{$guard->_workers};
# Abandoned jobs
for my $job (@{$guard->_jobs}) {
next if $job->{state} ne 'active' || $guard->_worker($job->{worker});
@$job{qw(error state)} = ('Worker went away.', 'failed');
}
}
sub reset { shift->_guard->_spurt({}) }
sub restart_job {
my ($self, $id) = @_;
my $guard = $self->_guard->_write;
return undef unless my $job = $guard->_job($id);
return undef unless $job->{state} eq 'failed' || $job->{state} eq 'finished';
$job->{restarts} += 1;
@$job{qw(restarted state)} = (time, 'inactive');
delete $job->{$_} for qw(error finished result started worker);
return 1;
}
sub stats {
my $self = shift;
my $guard = $self->_guard;
my $jobs = $guard->_jobs;
my %seen;
my $active = grep { $_->{state} eq 'active' && !$seen{$_}++ } @$jobs;
return {
active_workers => $active,
inactive_workers => @{$guard->_workers} - $active,
active_jobs => scalar(grep { $_->{state} eq 'active' } @$jobs),
inactive_jobs => scalar(grep { $_->{state} eq 'inactive' } @$jobs),
failed_jobs => scalar(grep { $_->{state} eq 'failed' } @$jobs),
finished_jobs => scalar(grep { $_->{state} eq 'finished' } @$jobs),
};
}
sub unregister_worker {
my ($self, $id) = @_;
my $guard = $self->_guard->_write;
@{$guard->_workers} = grep { $_->{id} ne $id } @{$guard->_workers};
}
sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
sub _guard { Minion::Backend::Storable::_Guard->new(shift->{file}) }
sub _update {
my ($self, $fail, $id, $err) = @_;
my $guard = $self->_guard->_write;
return undef unless my $job = $guard->_job($id);
return undef unless $job->{state} eq 'active';
$job->{finished} = time;
$job->{state} = $fail ? 'failed' : 'finished';
$job->{error} = $err if $err;
return 1;
}
sub _worker_info {
my ($self, $guard, $id) = @_;
return undef unless my $worker = $guard->_worker($id);
return {
%$worker,
jobs => [map { $_->{id} } grep { $_->{worker} eq $id } @{$guard->_jobs}]
};
}
package Minion::Backend::Storable::_Guard;
use Mojo::Base -base;
use Fcntl ':flock';
use IO::Compress::Gzip 'gzip';
use IO::Uncompress::Gunzip 'gunzip';
use List::Util 'first';
use Mojo::Util qw(md5_sum slurp spurt);
use Storable qw(freeze thaw);
sub DESTROY {
my $self = shift;
$self->_spurt($self->_data) if $self->{write};
flock $self->{lock}, LOCK_UN;
}
sub new {
my $self = shift->SUPER::new(file => shift);
$self->_spurt({}) unless -f $self->{file};
open $self->{lock}, '<', $self->{file};
flock $self->{lock}, LOCK_EX;
return $self;
}
sub _data { $_[0]->{data} //= $_[0]->_slurp }
sub _id {
my $self = shift;
my $id;
do { $id = md5_sum(time . rand 999) }
while $self->_worker($id) || $self->_job($id);
return $id;
}
sub _job {
first { $_[1] eq $_->{id} } @{$_[0]->_jobs};
}
sub _jobs { shift->_data->{jobs} //= [] }
sub _slurp {
gunzip \(my $compressed = slurp shift->{file}), \my $uncompressed;
return thaw $uncompressed;
}
sub _spurt {
gzip \(my $uncompressed = freeze(pop)), \my $compressed;
spurt $compressed, shift->{file};
}
sub _worker {
first { $_[1] eq $_->{id} } @{$_[0]->_workers};
}
sub _workers { shift->_data->{workers} //= [] }
sub _write { ++$_[0]->{write} && return $_[0] }
1;
use Mojo::Base -strict;
use Test::More;
use File::Spec::Functions 'catfile';
use File::Temp 'tempdir';
use Minion;
use Mojo::IOLoop;
use Storable qw(retrieve store);
use Sys::Hostname 'hostname';
# Clean up before start
my $tmpdir = tempdir CLEANUP => 1;
my $file = catfile $tmpdir, 'minion.data';
my $minion = Minion->new(Storable => $file);
$minion->reset;
# Nothing to repair
my $worker = $minion->repair->worker;
isa_ok $worker->minion->app, 'Mojolicious', 'has default application';
# Register and unregister
$worker->register;
like $worker->info->{started}, qr/^[\d.]+$/, 'has timestamp';
is $worker->unregister->info, undef, 'no information';
is $worker->register->info->{host}, hostname, 'right host';
is $worker->info->{pid}, $$, 'right pid';
is $worker->unregister->info, undef, 'no information';
# Repair dead worker
$minion->add_task(test => sub { });
my $worker2 = $minion->worker->register;
isnt $worker2->id, $worker->id, 'new id';
my $id = $minion->enqueue('test');
my $job = $worker2->dequeue;
is $job->id, $id, 'right id';
is $worker2->info->{jobs}[0], $job->id, 'right id';
$id = $worker2->id;
undef $worker2;
is $job->info->{state}, 'active', 'job is still active';
my $guard = $minion->backend->_guard->_write;
my $info = $guard->_worker($id);
ok $info, 'is registered';
my $pid = 4000;
$pid++ while kill 0, $pid;
$info->{pid} = $pid;
undef $guard;
$minion->repair;
ok !$minion->worker->id($id)->info, 'not registered';
is $job->info->{state}, 'failed', 'job is no longer active';
is $job->info->{error}, 'Worker went away.', 'right error';
# Repair abandoned job
$worker->register;
$id = $minion->enqueue('test');
$job = $worker->dequeue;
is $job->id, $id, 'right id';
$worker->unregister;
$minion->repair;
is $job->info->{state}, 'failed', 'job is no longer active';
is $job->info->{error}, 'Worker went away.', 'right error';
# List workers
$worker = $minion->worker->register;
$worker2 = $minion->worker->register;
my $batch = $minion->backend->list_workers(0, 10);
ok $batch->[0]{id}, 'has id';
is $batch->[0]{host}, hostname, 'right host';
is $batch->[0]{pid}, $$, 'right pid';
is $batch->[1]{host}, hostname, 'right host';
is $batch->[1]{pid}, $$, 'right pid';
ok !$batch->[2], 'no more results';
$batch = $minion->backend->list_workers(0, 1);
is $batch->[0]{id}, $worker2->id, 'right id';
ok !$batch->[1], 'no more results';
$batch = $minion->backend->list_workers(1, 1);
is $batch->[0]{id}, $worker->id, 'right id';
ok !$batch->[1], 'no more results';
$worker->unregister;
$worker2->unregister;
# Reset
$minion->reset->repair;
ok !@{$minion->backend->_guard->_jobs}, 'no jobs';
ok !@{$minion->backend->_guard->_workers}, 'no workers';
# Tasks
my $results = catfile $tmpdir, 'results.data';
store [], $results;
$minion->add_task(
add => sub {
my ($job, $first, $second) = @_;
my $result = retrieve $results;
push @$result, $first + $second;
store $result, $results;
}
);
$minion->add_task(exit => sub { exit 1 });
$minion->add_task(fail => sub { die "Intentional failure!\n" });
# Stats
my $stats = $minion->stats;
is $stats->{active_workers}, 0, 'no active workers';
is $stats->{inactive_workers}, 0, 'no inactive workers';
is $stats->{active_jobs}, 0, 'no active jobs';
is $stats->{failed_jobs}, 0, 'no failed jobs';
is $stats->{finished_jobs}, 0, 'no finished jobs';
is $stats->{inactive_jobs}, 0, 'no inactive jobs';
$worker = $minion->worker->register;
is $minion->stats->{inactive_workers}, 1, 'one inactive worker';
$minion->enqueue('fail');
$minion->enqueue('fail');
is $minion->stats->{inactive_jobs}, 2, 'two inactive jobs';
$job = $worker->dequeue;
$stats = $minion->stats;
is $stats->{active_workers}, 1, 'one active worker';
is $stats->{active_jobs}, 1, 'one active job';
is $stats->{inactive_jobs}, 1, 'one inactive job';
ok $job->finish, 'job finished';
is $minion->stats->{finished_jobs}, 1, 'one finished job';
$job = $worker->dequeue;
ok $job->fail, 'job failed';
is $minion->stats->{failed_jobs}, 1, 'one failed job';
ok $job->restart, 'job restarted';
is $minion->stats->{failed_jobs}, 0, 'no failed jobs';
ok $worker->dequeue->finish, 'job finished';
$worker->unregister;
$stats = $minion->stats;
is $stats->{active_workers}, 0, 'no active workers';
is $stats->{inactive_workers}, 0, 'no inactive workers';
is $stats->{active_jobs}, 0, 'no active jobs';
is $stats->{failed_jobs}, 0, 'one failed job';
is $stats->{finished_jobs}, 2, 'one finished job';
is $stats->{inactive_jobs}, 0, 'no inactive jobs';
# List jobs
$batch = $minion->backend->list_jobs(0, 10);
ok $batch->[0]{id}, 'has id';
is $batch->[0]{task}, 'fail', 'right task';
is $batch->[0]{state}, 'finished', 'right state';
is $batch->[0]{restarts}, 1, 'job has been restarted';
is $batch->[1]{task}, 'fail', 'right task';
is $batch->[1]{state}, 'finished', 'right state';
is $batch->[1]{restarts}, 0, 'job has not been restarted';
ok !$batch->[2], 'no more results';
$batch = $minion->backend->list_jobs(0, 1);
is $batch->[0]{restarts}, 1, 'job has been restarted';
ok !$batch->[1], 'no more results';
$batch = $minion->backend->list_jobs(1, 1);
is $batch->[0]{restarts}, 0, 'job has not been restarted';
ok !$batch->[1], 'no more results';
# Enqueue, dequeue and perform
is $minion->job(12345), undef, 'job does not exist';
$id = $minion->enqueue(add => [2, 2]);
ok $minion->job($id), 'job does exist';
$info = $minion->job($id)->info;
is_deeply $info->{args}, [2, 2], 'right arguments';
is $info->{priority}, 0, 'right priority';
is $info->{state}, 'inactive', 'right state';
$worker = $minion->worker;
is $worker->dequeue, undef, 'not registered';
ok !$minion->job($id)->info->{started}, 'no started timestamp';
$job = $worker->register->dequeue;
like $job->info->{created}, qr/^[\d.]+$/, 'has created timestamp';
like $job->info->{started}, qr/^[\d.]+$/, 'has started timestamp';
is_deeply $job->args, [2, 2], 'right arguments';
is $job->info->{state}, 'active', 'right state';
is $job->task, 'add', 'right task';
$id = $job->info->{worker};
is $minion->backend->worker_info($id)->{pid}, $$, 'right worker';
ok !$job->info->{finished}, 'no finished timestamp';
$job->perform;
like $job->info->{finished}, qr/^[\d.]+$/, 'has finished timestamp';
is_deeply retrieve($results), [4], 'right result';
is $job->info->{state}, 'finished', 'right state';
$worker->unregister;
$job = $minion->job($job->id);
is_deeply $job->args, [2, 2], 'right arguments';
is $job->info->{state}, 'finished', 'right state';
is $job->task, 'add', 'right task';
# Restart and remove
$id = $minion->enqueue(add => [5, 6]);
$job = $worker->register->dequeue;
is $job->info->{restarts}, 0, 'job has not been restarted';
is $job->id, $id, 'right id';
ok $job->finish, 'job finished';
ok !$worker->dequeue, 'no more jobs';
$job = $minion->job($id);
ok !$job->info->{restarted}, 'no restarted timestamp';
ok $job->restart, 'job restarted';
like $job->info->{restarted}, qr/^[\d.]+$/, 'has restarted timestamp';
is $job->info->{state}, 'inactive', 'right state';
is $job->info->{restarts}, 1, 'job has been restarted once';
$job = $worker->dequeue;
ok !$job->restart, 'job not restarted';
is $job->id, $id, 'right id';
ok !$job->remove, 'job has not been removed';
ok $job->fail, 'job failed';
ok $job->restart, 'job restarted';
is $job->info->{restarts}, 2, 'job has been restarted twice';
ok !$job->info->{finished}, 'no finished timestamp';
ok !$job->info->{started}, 'no started timestamp';
ok !$job->info->{error}, 'no error';
ok !$job->info->{worker}, 'no worker';
$job = $worker->dequeue;
is $job->info->{state}, 'active', 'right state';
ok $job->finish, 'job finished';
ok $job->remove, 'job has been removed';
is $job->info, undef, 'no information';
$id = $minion->enqueue(add => [6, 5]);
$job = $worker->dequeue;
is $job->id, $id, 'right id';
ok $job->fail, 'job failed';
ok $job->remove, 'job has been removed';
is $job->info, undef, 'no information';
$id = $minion->enqueue(add => [5, 5]);
$job = $minion->job("$id");
ok $job->remove, 'job has been removed';
$worker->unregister;
# Jobs with priority
$minion->enqueue(add => [1, 2]);
$id = $minion->enqueue(add => [2, 4], {priority => 1});
$job = $worker->register->dequeue;
is $job->id, $id, 'right id';
is $job->info->{priority}, 1, 'right priority';
ok $job->finish, 'job finished';
isnt $worker->dequeue->id, $id, 'different id';
$worker->unregister;
# Delayed jobs
my $epoch = time + 100;
$id = $minion->enqueue(add => [2, 1] => {delayed => $epoch});
is $worker->register->dequeue, undef, 'too early for job';
is $minion->job($id)->info->{delayed}, $epoch, 'right delayed timestamp';
$guard = $minion->backend->_guard->_write;
$info = $guard->_job($id);
$info->{delayed} = time - 100;
undef $guard;
$job = $worker->dequeue;
is $job->id, $id, 'right id';
like $job->info->{delayed}, qr/^[\d.]+$/, 'has delayed timestamp';
ok $job->finish, 'job finished';
$worker->unregister;
# Enqueue non-blocking
my ($fail, $result) = @_;
$minion->enqueue(
add => [23] => {priority => 1} => sub {
my ($minion, $err, $id) = @_;
$fail = $err;
$result = $id;
Mojo::IOLoop->stop;
}
);
Mojo::IOLoop->start;
ok !$fail, 'no error';
$worker = $minion->worker->register;
$job = $worker->dequeue;
is $job->id, $result, 'right id';
is_deeply $job->args, [23], 'right arguments';
is $job->info->{priority}, 1, 'right priority';
ok $job->finish, 'job finished';
$worker->unregister;
# Events
my ($failed, $finished) = (0, 0);
$minion->on(
worker => sub {
my ($minion, $worker) = @_;
$worker->on(
dequeue => sub {
my ($worker, $job) = @_;
$job->on(failed => sub { $failed++ });
$job->on(finished => sub { $finished++ });
}
);
}
);
$worker = $minion->worker->register;
$minion->enqueue(add => [3, 3]);
$minion->enqueue(add => [4, 3]);
$job = $worker->dequeue;
is $failed, 0, 'failed event has not been emitted';
is $finished, 0, 'finished event has not been emitted';
$job->finish;
$job->finish;
is $failed, 0, 'failed event has not been emitted';
is $finished, 1, 'finished event has been emitted once';
$job = $worker->dequeue;
my $err;
$job->on(failed => sub { $err = pop });
$job->fail("test\n");
$job->fail;
is $err, "test\n", 'right error';
is $failed, 1, 'failed event has been emitted once';
is $finished, 1, 'finished event has been emitted once';
$worker->unregister;
# Failed jobs
$id = $minion->enqueue(add => [5, 6]);
$job = $worker->register->dequeue;
is $job->id, $id, 'right id';
is $job->info->{error}, undef, 'no error';
ok $job->fail, 'job failed';
ok !$job->finish, 'job not finished';
is $job->info->{state}, 'failed', 'right state';
is $job->info->{error}, 'Unknown error', 'right error';
$id = $minion->enqueue(add => [6, 7]);
$job = $worker->dequeue;
is $job->id, $id, 'right id';
ok $job->fail('Something bad happened!'), 'job failed';
is $job->info->{state}, 'failed', 'right state';
is $job->info->{error}, 'Something bad happened!', 'right error';
$id = $minion->enqueue('fail');
$job = $worker->dequeue;
is $job->id, $id, 'right id';
$job->perform;
is $job->info->{state}, 'failed', 'right state';
is $job->info->{error}, "Intentional failure!\n", 'right error';
$worker->unregister;
# Exit
$id = $minion->enqueue('exit');
$job = $worker->register->dequeue;
is $job->id, $id, 'right id';
$job->perform;
is $job->info->{state}, 'failed', 'right state';
is $job->info->{error}, 'Non-zero exit status', 'right error';
$worker->unregister;
$minion->reset;
done_testing();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment