Created
June 13, 2014 04:14
-
-
Save anonymous/8899b1f7e0b507664514 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Minion::Backend::Storable; | |
use Mojo::Base 'Minion::Backend'; | |
use List::Util 'first'; | |
use Mojo::IOLoop; | |
use Sys::Hostname 'hostname'; | |
use Time::HiRes 'time'; | |
has 'file'; | |
sub dequeue { | |
my ($self, $id) = @_; | |
my $guard = $self->_guard; | |
my @inactive = grep { $_->{state} eq 'inactive' } @{$guard->_jobs}; | |
my $now = time; | |
my @ready = sort { $b->{priority} <=> $a->{priority} } | |
grep { $_->{delayed} < $now } @inactive; | |
return undef | |
unless my $job = first { $self->minion->tasks->{$_->{task}} } @ready; | |
$guard->_write; | |
@$job{qw(started state worker)} = (time, 'active', $id); | |
return $job; | |
} | |
sub enqueue { | |
my ($self, $task) = (shift, shift); | |
my $cb = ref $_[-1] eq 'CODE' ? pop : undef; | |
my $args = shift // []; | |
my $options = shift // {}; | |
my $guard = $self->_guard->_write; | |
my $job = { | |
args => $args, | |
created => time, | |
delayed => $options->{delayed} ? $options->{delayed} : 1, | |
id => $guard->_id, | |
priority => $options->{priority} // 0, | |
restarts => 0, | |
state => 'inactive', | |
task => $task | |
}; | |
# Blocking | |
push @{$guard->_jobs}, $job; | |
return $job->{id} unless $cb; | |
# Non-blocking | |
Mojo::IOLoop->next_tick(sub { $self->$cb(undef, $job->{id}) }); | |
} | |
sub fail_job { shift->_update(1, @_) } | |
sub finish_job { shift->_update(0, @_) } | |
sub job_info { shift->_guard->_job(shift) } | |
sub list_jobs { | |
my ($self, $skip, $limit) = @_; | |
my $guard = $self->_guard; | |
my @jobs = sort { $b->{created} cmp $a->{created} } @{$guard->_jobs}; | |
return [@jobs[$skip .. ($skip + $limit - 1)]]; | |
} | |
sub list_workers { | |
my ($self, $skip, $limit) = @_; | |
my $guard = $self->_guard; | |
my @workers = map { $self->_worker_info($guard, $_->{id}) } | |
sort { $b->{started} cmp $a->{started} } @{$guard->_workers}; | |
return [@workers[$skip .. ($skip + $limit - 1)]]; | |
} | |
sub new { shift->SUPER::new(file => shift) } | |
sub register_worker { | |
my $guard = shift->_guard->_write; | |
my $id = $guard->_id; | |
push @{$guard->_workers}, | |
{host => hostname, id => $id, pid => $$, started => time}; | |
return $id; | |
} | |
sub remove_job { | |
my ($self, $id) = @_; | |
my $guard = $self->_guard->_write; | |
return undef unless my $job = $guard->_job($id); | |
return undef | |
unless grep { $job->{state} eq $_ } qw(failed finished inactive); | |
@{$guard->_jobs} = grep { $_->{id} ne $id } @{$guard->_jobs}; | |
return 1; | |
} | |
sub repair { | |
my $self = shift; | |
# Check workers on this host (all should be owned by the same user) | |
my $guard = $self->_guard->_write; | |
@{$guard->_workers} | |
= grep { $_->{host} ne hostname || kill 0, $_->{pid} } @{$guard->_workers}; | |
# Abandoned jobs | |
for my $job (@{$guard->_jobs}) { | |
next if $job->{state} ne 'active' || $guard->_worker($job->{worker}); | |
@$job{qw(error state)} = ('Worker went away.', 'failed'); | |
} | |
} | |
sub reset { shift->_guard->_spurt({}) } | |
sub restart_job { | |
my ($self, $id) = @_; | |
my $guard = $self->_guard->_write; | |
return undef unless my $job = $guard->_job($id); | |
return undef unless $job->{state} eq 'failed' || $job->{state} eq 'finished'; | |
$job->{restarts} += 1; | |
@$job{qw(restarted state)} = (time, 'inactive'); | |
delete $job->{$_} for qw(error finished result started worker); | |
return 1; | |
} | |
sub stats { | |
my $self = shift; | |
my $guard = $self->_guard; | |
my $jobs = $guard->_jobs; | |
my %seen; | |
my $active = grep { $_->{state} eq 'active' && !$seen{$_}++ } @$jobs; | |
return { | |
active_workers => $active, | |
inactive_workers => @{$guard->_workers} - $active, | |
active_jobs => scalar(grep { $_->{state} eq 'active' } @$jobs), | |
inactive_jobs => scalar(grep { $_->{state} eq 'inactive' } @$jobs), | |
failed_jobs => scalar(grep { $_->{state} eq 'failed' } @$jobs), | |
finished_jobs => scalar(grep { $_->{state} eq 'finished' } @$jobs), | |
}; | |
} | |
sub unregister_worker { | |
my ($self, $id) = @_; | |
my $guard = $self->_guard->_write; | |
@{$guard->_workers} = grep { $_->{id} ne $id } @{$guard->_workers}; | |
} | |
sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) } | |
sub _guard { Minion::Backend::Storable::_Guard->new(shift->{file}) } | |
sub _update { | |
my ($self, $fail, $id, $err) = @_; | |
my $guard = $self->_guard->_write; | |
return undef unless my $job = $guard->_job($id); | |
return undef unless $job->{state} eq 'active'; | |
$job->{finished} = time; | |
$job->{state} = $fail ? 'failed' : 'finished'; | |
$job->{error} = $err if $err; | |
return 1; | |
} | |
sub _worker_info { | |
my ($self, $guard, $id) = @_; | |
return undef unless my $worker = $guard->_worker($id); | |
return { | |
%$worker, | |
jobs => [map { $_->{id} } grep { $_->{worker} eq $id } @{$guard->_jobs}] | |
}; | |
} | |
package Minion::Backend::Storable::_Guard; | |
use Mojo::Base -base; | |
use Fcntl ':flock'; | |
use IO::Compress::Gzip 'gzip'; | |
use IO::Uncompress::Gunzip 'gunzip'; | |
use List::Util 'first'; | |
use Mojo::Util qw(md5_sum slurp spurt); | |
use Storable qw(freeze thaw); | |
sub DESTROY { | |
my $self = shift; | |
$self->_spurt($self->_data) if $self->{write}; | |
flock $self->{lock}, LOCK_UN; | |
} | |
sub new { | |
my $self = shift->SUPER::new(file => shift); | |
$self->_spurt({}) unless -f $self->{file}; | |
open $self->{lock}, '<', $self->{file}; | |
flock $self->{lock}, LOCK_EX; | |
return $self; | |
} | |
sub _data { $_[0]->{data} //= $_[0]->_slurp } | |
sub _id { | |
my $self = shift; | |
my $id; | |
do { $id = md5_sum(time . rand 999) } | |
while $self->_worker($id) || $self->_job($id); | |
return $id; | |
} | |
sub _job { | |
first { $_[1] eq $_->{id} } @{$_[0]->_jobs}; | |
} | |
sub _jobs { shift->_data->{jobs} //= [] } | |
sub _slurp { | |
gunzip \(my $compressed = slurp shift->{file}), \my $uncompressed; | |
return thaw $uncompressed; | |
} | |
sub _spurt { | |
gzip \(my $uncompressed = freeze(pop)), \my $compressed; | |
spurt $compressed, shift->{file}; | |
} | |
sub _worker { | |
first { $_[1] eq $_->{id} } @{$_[0]->_workers}; | |
} | |
sub _workers { shift->_data->{workers} //= [] } | |
sub _write { ++$_[0]->{write} && return $_[0] } | |
1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use Mojo::Base -strict; | |
use Test::More; | |
use File::Spec::Functions 'catfile'; | |
use File::Temp 'tempdir'; | |
use Minion; | |
use Mojo::IOLoop; | |
use Storable qw(retrieve store); | |
use Sys::Hostname 'hostname'; | |
# Clean up before start | |
my $tmpdir = tempdir CLEANUP => 1; | |
my $file = catfile $tmpdir, 'minion.data'; | |
my $minion = Minion->new(Storable => $file); | |
$minion->reset; | |
# Nothing to repair | |
my $worker = $minion->repair->worker; | |
isa_ok $worker->minion->app, 'Mojolicious', 'has default application'; | |
# Register and unregister | |
$worker->register; | |
like $worker->info->{started}, qr/^[\d.]+$/, 'has timestamp'; | |
is $worker->unregister->info, undef, 'no information'; | |
is $worker->register->info->{host}, hostname, 'right host'; | |
is $worker->info->{pid}, $$, 'right pid'; | |
is $worker->unregister->info, undef, 'no information'; | |
# Repair dead worker | |
$minion->add_task(test => sub { }); | |
my $worker2 = $minion->worker->register; | |
isnt $worker2->id, $worker->id, 'new id'; | |
my $id = $minion->enqueue('test'); | |
my $job = $worker2->dequeue; | |
is $job->id, $id, 'right id'; | |
is $worker2->info->{jobs}[0], $job->id, 'right id'; | |
$id = $worker2->id; | |
undef $worker2; | |
is $job->info->{state}, 'active', 'job is still active'; | |
my $guard = $minion->backend->_guard->_write; | |
my $info = $guard->_worker($id); | |
ok $info, 'is registered'; | |
my $pid = 4000; | |
$pid++ while kill 0, $pid; | |
$info->{pid} = $pid; | |
undef $guard; | |
$minion->repair; | |
ok !$minion->worker->id($id)->info, 'not registered'; | |
is $job->info->{state}, 'failed', 'job is no longer active'; | |
is $job->info->{error}, 'Worker went away.', 'right error'; | |
# Repair abandoned job | |
$worker->register; | |
$id = $minion->enqueue('test'); | |
$job = $worker->dequeue; | |
is $job->id, $id, 'right id'; | |
$worker->unregister; | |
$minion->repair; | |
is $job->info->{state}, 'failed', 'job is no longer active'; | |
is $job->info->{error}, 'Worker went away.', 'right error'; | |
# List workers | |
$worker = $minion->worker->register; | |
$worker2 = $minion->worker->register; | |
my $batch = $minion->backend->list_workers(0, 10); | |
ok $batch->[0]{id}, 'has id'; | |
is $batch->[0]{host}, hostname, 'right host'; | |
is $batch->[0]{pid}, $$, 'right pid'; | |
is $batch->[1]{host}, hostname, 'right host'; | |
is $batch->[1]{pid}, $$, 'right pid'; | |
ok !$batch->[2], 'no more results'; | |
$batch = $minion->backend->list_workers(0, 1); | |
is $batch->[0]{id}, $worker2->id, 'right id'; | |
ok !$batch->[1], 'no more results'; | |
$batch = $minion->backend->list_workers(1, 1); | |
is $batch->[0]{id}, $worker->id, 'right id'; | |
ok !$batch->[1], 'no more results'; | |
$worker->unregister; | |
$worker2->unregister; | |
# Reset | |
$minion->reset->repair; | |
ok !@{$minion->backend->_guard->_jobs}, 'no jobs'; | |
ok !@{$minion->backend->_guard->_workers}, 'no workers'; | |
# Tasks | |
my $results = catfile $tmpdir, 'results.data'; | |
store [], $results; | |
$minion->add_task( | |
add => sub { | |
my ($job, $first, $second) = @_; | |
my $result = retrieve $results; | |
push @$result, $first + $second; | |
store $result, $results; | |
} | |
); | |
$minion->add_task(exit => sub { exit 1 }); | |
$minion->add_task(fail => sub { die "Intentional failure!\n" }); | |
# Stats | |
my $stats = $minion->stats; | |
is $stats->{active_workers}, 0, 'no active workers'; | |
is $stats->{inactive_workers}, 0, 'no inactive workers'; | |
is $stats->{active_jobs}, 0, 'no active jobs'; | |
is $stats->{failed_jobs}, 0, 'no failed jobs'; | |
is $stats->{finished_jobs}, 0, 'no finished jobs'; | |
is $stats->{inactive_jobs}, 0, 'no inactive jobs'; | |
$worker = $minion->worker->register; | |
is $minion->stats->{inactive_workers}, 1, 'one inactive worker'; | |
$minion->enqueue('fail'); | |
$minion->enqueue('fail'); | |
is $minion->stats->{inactive_jobs}, 2, 'two inactive jobs'; | |
$job = $worker->dequeue; | |
$stats = $minion->stats; | |
is $stats->{active_workers}, 1, 'one active worker'; | |
is $stats->{active_jobs}, 1, 'one active job'; | |
is $stats->{inactive_jobs}, 1, 'one inactive job'; | |
ok $job->finish, 'job finished'; | |
is $minion->stats->{finished_jobs}, 1, 'one finished job'; | |
$job = $worker->dequeue; | |
ok $job->fail, 'job failed'; | |
is $minion->stats->{failed_jobs}, 1, 'one failed job'; | |
ok $job->restart, 'job restarted'; | |
is $minion->stats->{failed_jobs}, 0, 'no failed jobs'; | |
ok $worker->dequeue->finish, 'job finished'; | |
$worker->unregister; | |
$stats = $minion->stats; | |
is $stats->{active_workers}, 0, 'no active workers'; | |
is $stats->{inactive_workers}, 0, 'no inactive workers'; | |
is $stats->{active_jobs}, 0, 'no active jobs'; | |
is $stats->{failed_jobs}, 0, 'one failed job'; | |
is $stats->{finished_jobs}, 2, 'one finished job'; | |
is $stats->{inactive_jobs}, 0, 'no inactive jobs'; | |
# List jobs | |
$batch = $minion->backend->list_jobs(0, 10); | |
ok $batch->[0]{id}, 'has id'; | |
is $batch->[0]{task}, 'fail', 'right task'; | |
is $batch->[0]{state}, 'finished', 'right state'; | |
is $batch->[0]{restarts}, 1, 'job has been restarted'; | |
is $batch->[1]{task}, 'fail', 'right task'; | |
is $batch->[1]{state}, 'finished', 'right state'; | |
is $batch->[1]{restarts}, 0, 'job has not been restarted'; | |
ok !$batch->[2], 'no more results'; | |
$batch = $minion->backend->list_jobs(0, 1); | |
is $batch->[0]{restarts}, 1, 'job has been restarted'; | |
ok !$batch->[1], 'no more results'; | |
$batch = $minion->backend->list_jobs(1, 1); | |
is $batch->[0]{restarts}, 0, 'job has not been restarted'; | |
ok !$batch->[1], 'no more results'; | |
# Enqueue, dequeue and perform | |
is $minion->job(12345), undef, 'job does not exist'; | |
$id = $minion->enqueue(add => [2, 2]); | |
ok $minion->job($id), 'job does exist'; | |
$info = $minion->job($id)->info; | |
is_deeply $info->{args}, [2, 2], 'right arguments'; | |
is $info->{priority}, 0, 'right priority'; | |
is $info->{state}, 'inactive', 'right state'; | |
$worker = $minion->worker; | |
is $worker->dequeue, undef, 'not registered'; | |
ok !$minion->job($id)->info->{started}, 'no started timestamp'; | |
$job = $worker->register->dequeue; | |
like $job->info->{created}, qr/^[\d.]+$/, 'has created timestamp'; | |
like $job->info->{started}, qr/^[\d.]+$/, 'has started timestamp'; | |
is_deeply $job->args, [2, 2], 'right arguments'; | |
is $job->info->{state}, 'active', 'right state'; | |
is $job->task, 'add', 'right task'; | |
$id = $job->info->{worker}; | |
is $minion->backend->worker_info($id)->{pid}, $$, 'right worker'; | |
ok !$job->info->{finished}, 'no finished timestamp'; | |
$job->perform; | |
like $job->info->{finished}, qr/^[\d.]+$/, 'has finished timestamp'; | |
is_deeply retrieve($results), [4], 'right result'; | |
is $job->info->{state}, 'finished', 'right state'; | |
$worker->unregister; | |
$job = $minion->job($job->id); | |
is_deeply $job->args, [2, 2], 'right arguments'; | |
is $job->info->{state}, 'finished', 'right state'; | |
is $job->task, 'add', 'right task'; | |
# Restart and remove | |
$id = $minion->enqueue(add => [5, 6]); | |
$job = $worker->register->dequeue; | |
is $job->info->{restarts}, 0, 'job has not been restarted'; | |
is $job->id, $id, 'right id'; | |
ok $job->finish, 'job finished'; | |
ok !$worker->dequeue, 'no more jobs'; | |
$job = $minion->job($id); | |
ok !$job->info->{restarted}, 'no restarted timestamp'; | |
ok $job->restart, 'job restarted'; | |
like $job->info->{restarted}, qr/^[\d.]+$/, 'has restarted timestamp'; | |
is $job->info->{state}, 'inactive', 'right state'; | |
is $job->info->{restarts}, 1, 'job has been restarted once'; | |
$job = $worker->dequeue; | |
ok !$job->restart, 'job not restarted'; | |
is $job->id, $id, 'right id'; | |
ok !$job->remove, 'job has not been removed'; | |
ok $job->fail, 'job failed'; | |
ok $job->restart, 'job restarted'; | |
is $job->info->{restarts}, 2, 'job has been restarted twice'; | |
ok !$job->info->{finished}, 'no finished timestamp'; | |
ok !$job->info->{started}, 'no started timestamp'; | |
ok !$job->info->{error}, 'no error'; | |
ok !$job->info->{worker}, 'no worker'; | |
$job = $worker->dequeue; | |
is $job->info->{state}, 'active', 'right state'; | |
ok $job->finish, 'job finished'; | |
ok $job->remove, 'job has been removed'; | |
is $job->info, undef, 'no information'; | |
$id = $minion->enqueue(add => [6, 5]); | |
$job = $worker->dequeue; | |
is $job->id, $id, 'right id'; | |
ok $job->fail, 'job failed'; | |
ok $job->remove, 'job has been removed'; | |
is $job->info, undef, 'no information'; | |
$id = $minion->enqueue(add => [5, 5]); | |
$job = $minion->job("$id"); | |
ok $job->remove, 'job has been removed'; | |
$worker->unregister; | |
# Jobs with priority | |
$minion->enqueue(add => [1, 2]); | |
$id = $minion->enqueue(add => [2, 4], {priority => 1}); | |
$job = $worker->register->dequeue; | |
is $job->id, $id, 'right id'; | |
is $job->info->{priority}, 1, 'right priority'; | |
ok $job->finish, 'job finished'; | |
isnt $worker->dequeue->id, $id, 'different id'; | |
$worker->unregister; | |
# Delayed jobs | |
my $epoch = time + 100; | |
$id = $minion->enqueue(add => [2, 1] => {delayed => $epoch}); | |
is $worker->register->dequeue, undef, 'too early for job'; | |
is $minion->job($id)->info->{delayed}, $epoch, 'right delayed timestamp'; | |
$guard = $minion->backend->_guard->_write; | |
$info = $guard->_job($id); | |
$info->{delayed} = time - 100; | |
undef $guard; | |
$job = $worker->dequeue; | |
is $job->id, $id, 'right id'; | |
like $job->info->{delayed}, qr/^[\d.]+$/, 'has delayed timestamp'; | |
ok $job->finish, 'job finished'; | |
$worker->unregister; | |
# Enqueue non-blocking | |
my ($fail, $result) = @_; | |
$minion->enqueue( | |
add => [23] => {priority => 1} => sub { | |
my ($minion, $err, $id) = @_; | |
$fail = $err; | |
$result = $id; | |
Mojo::IOLoop->stop; | |
} | |
); | |
Mojo::IOLoop->start; | |
ok !$fail, 'no error'; | |
$worker = $minion->worker->register; | |
$job = $worker->dequeue; | |
is $job->id, $result, 'right id'; | |
is_deeply $job->args, [23], 'right arguments'; | |
is $job->info->{priority}, 1, 'right priority'; | |
ok $job->finish, 'job finished'; | |
$worker->unregister; | |
# Events | |
my ($failed, $finished) = (0, 0); | |
$minion->on( | |
worker => sub { | |
my ($minion, $worker) = @_; | |
$worker->on( | |
dequeue => sub { | |
my ($worker, $job) = @_; | |
$job->on(failed => sub { $failed++ }); | |
$job->on(finished => sub { $finished++ }); | |
} | |
); | |
} | |
); | |
$worker = $minion->worker->register; | |
$minion->enqueue(add => [3, 3]); | |
$minion->enqueue(add => [4, 3]); | |
$job = $worker->dequeue; | |
is $failed, 0, 'failed event has not been emitted'; | |
is $finished, 0, 'finished event has not been emitted'; | |
$job->finish; | |
$job->finish; | |
is $failed, 0, 'failed event has not been emitted'; | |
is $finished, 1, 'finished event has been emitted once'; | |
$job = $worker->dequeue; | |
my $err; | |
$job->on(failed => sub { $err = pop }); | |
$job->fail("test\n"); | |
$job->fail; | |
is $err, "test\n", 'right error'; | |
is $failed, 1, 'failed event has been emitted once'; | |
is $finished, 1, 'finished event has been emitted once'; | |
$worker->unregister; | |
# Failed jobs | |
$id = $minion->enqueue(add => [5, 6]); | |
$job = $worker->register->dequeue; | |
is $job->id, $id, 'right id'; | |
is $job->info->{error}, undef, 'no error'; | |
ok $job->fail, 'job failed'; | |
ok !$job->finish, 'job not finished'; | |
is $job->info->{state}, 'failed', 'right state'; | |
is $job->info->{error}, 'Unknown error', 'right error'; | |
$id = $minion->enqueue(add => [6, 7]); | |
$job = $worker->dequeue; | |
is $job->id, $id, 'right id'; | |
ok $job->fail('Something bad happened!'), 'job failed'; | |
is $job->info->{state}, 'failed', 'right state'; | |
is $job->info->{error}, 'Something bad happened!', 'right error'; | |
$id = $minion->enqueue('fail'); | |
$job = $worker->dequeue; | |
is $job->id, $id, 'right id'; | |
$job->perform; | |
is $job->info->{state}, 'failed', 'right state'; | |
is $job->info->{error}, "Intentional failure!\n", 'right error'; | |
$worker->unregister; | |
# Exit | |
$id = $minion->enqueue('exit'); | |
$job = $worker->register->dequeue; | |
is $job->id, $id, 'right id'; | |
$job->perform; | |
is $job->info->{state}, 'failed', 'right state'; | |
is $job->info->{error}, 'Non-zero exit status', 'right error'; | |
$worker->unregister; | |
$minion->reset; | |
done_testing(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment