-
-
Save anonymous/0aaa8480030530aaf854dba405a0dbc3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/Changes b/Changes | |
index 1115341..c86889d 100644 | |
--- a/Changes | |
+++ b/Changes | |
@@ -1,5 +1,11 @@ | |
-5.10 2016-09-14 | |
+6.0 2016-09-16 | |
+ - Removed TTIN, TTOU and USR1 signals from worker command. | |
+ - Added support for worker remote control commands. | |
+ - Added commands attribute to Minion::Worker. | |
+ - Added add_command and process_commands methods to Minion::Worker. | |
+ - Added receive_commands and send_command methods to Minion::Backend::Pg. | |
+ - Improved worker command with support for jobs remote control command. | |
5.09 2016-08-31 | |
- Added EXPERIMENTAL enqueued_jobs field to stats methods in Minion and | |
diff --git a/lib/Minion.pm b/lib/Minion.pm | |
index ead13d4..7710fff 100644 | |
--- a/lib/Minion.pm | |
+++ b/lib/Minion.pm | |
@@ -15,7 +15,7 @@ has missing_after => 1800; | |
has remove_after => 172800; | |
has tasks => sub { {} }; | |
-our $VERSION = '5.10'; | |
+our $VERSION = '6.0'; | |
sub add_task { ($_[0]->tasks->{$_[1]} = $_[2]) and return $_[0] } | |
diff --git a/lib/Minion/Backend.pm b/lib/Minion/Backend.pm | |
index bdea6fd..32c22cc 100644 | |
--- a/lib/Minion/Backend.pm | |
+++ b/lib/Minion/Backend.pm | |
@@ -13,15 +13,20 @@ sub job_info { croak 'Method "job_info" not implemented by subclass' } | |
sub list_jobs { croak 'Method "list_jobs" not implemented by subclass' } | |
sub list_workers { croak 'Method "list_workers" not implemented by subclass' } | |
+sub receive_commands { | |
+ croak 'Method "receive_commands" not implemented by subclass'; | |
+} | |
+ | |
sub register_worker { | |
croak 'Method "register_worker" not implemented by subclass'; | |
} | |
-sub remove_job { croak 'Method "remove_job" not implemented by subclass' } | |
-sub repair { croak 'Method "repair" not implemented by subclass' } | |
-sub reset { croak 'Method "reset" not implemented by subclass' } | |
-sub retry_job { croak 'Method "retry_job" not implemented by subclass' } | |
-sub stats { croak 'Method "stats" not implemented by subclass' } | |
+sub remove_job { croak 'Method "remove_job" not implemented by subclass' } | |
+sub repair { croak 'Method "repair" not implemented by subclass' } | |
+sub reset { croak 'Method "reset" not implemented by subclass' } | |
+sub retry_job { croak 'Method "retry_job" not implemented by subclass' } | |
+sub send_command { croak 'Method "send_command" not implemented by subclass' } | |
+sub stats { croak 'Method "stats" not implemented by subclass' } | |
sub unregister_worker { | |
croak 'Method "unregister_worker" not implemented by subclass'; | |
@@ -49,11 +54,13 @@ Minion::Backend - Backend base class | |
sub job_info {...} | |
sub list_jobs {...} | |
sub list_workers {...} | |
+ sub receive_commands {...} | |
sub register_worker {...} | |
sub remove_job {...} | |
sub repair {...} | |
sub reset {...} | |
sub retry_job {...} | |
+ sub send_command {...} | |
sub stats {...} | |
sub unregister_worker {...} | |
sub worker_info {...} | |
@@ -351,6 +358,12 @@ List only jobs for this task. | |
Returns the same information as L</"worker_info"> but in batches. Meant to be | |
overloaded in a subclass. | |
+=head2 receive_commands | |
+ | |
+ my $commands = $backend->receive_commands($worker_id); | |
+ | |
+Receive worker remote control commands. | |
+ | |
=head2 register_worker | |
my $worker_id = $backend->register_worker; | |
@@ -412,6 +425,12 @@ Queue to put job in. | |
=back | |
+=head2 send_command | |
+ | |
+ my $bool = $backend->send_command($worker_id, 'some_command', [@args]); | |
+ | |
+Send worker remote control command. | |
+ | |
=head2 stats | |
my $stats = $backend->stats; | |
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm | |
index b1b36df..51c0d86 100644 | |
--- a/lib/Minion/Backend/Pg.pm | |
+++ b/lib/Minion/Backend/Pg.pm | |
@@ -85,6 +85,15 @@ sub new { | |
return $self; | |
} | |
+sub receive_commands { | |
+ shift->pg->db->query( | |
+ "update minion_workers as new set inbox = '[]' | |
+ from (select id, inbox from minion_workers where id = ? for update) as old | |
+ where new.id = old.id and old.inbox != '[]' | |
+ returning old.inbox", shift | |
+ )->expand->array->[0]; | |
+} | |
+ | |
sub register_worker { | |
my ($self, $id) = @_; | |
@@ -162,6 +171,13 @@ sub retry_job { | |
)->rows; | |
} | |
+sub send_command { | |
+ !!shift->pg->db->query( | |
+ 'update minion_workers set inbox = inbox || $2::jsonb | |
+ where id = $1', shift, {json => [[shift, @{shift()}]]} | |
+ )->rows; | |
+} | |
+ | |
sub stats { | |
my $self = shift; | |
@@ -549,6 +565,12 @@ Returns the same information as L</"worker_info"> but in batches. | |
Construct a new L<Minion::Backend::Pg> object. | |
+=head2 receive_commands | |
+ | |
+ my $commands = $backend->receive_commands($worker_id); | |
+ | |
+Receive worker remote control commands. | |
+ | |
=head2 register_worker | |
my $worker_id = $backend->register_worker; | |
@@ -606,6 +628,12 @@ Queue to put job in. | |
=back | |
+=head2 send_command | |
+ | |
+ my $bool = $backend->send_command($worker_id, 'some_command', [@args]); | |
+ | |
+Send worker remote control command. | |
+ | |
=head2 stats | |
my $stats = $backend->stats; | |
@@ -812,3 +840,7 @@ alter table minion_jobs add column parents bigint[] default '{}'::bigint[]; | |
-- 11 up | |
create index on minion_jobs (state, priority desc, id); | |
+ | |
+-- 12 up | |
+alter table minion_workers add column inbox jsonb | |
+ check(jsonb_typeof(inbox) = 'array') default '[]'; | |
diff --git a/lib/Minion/Command/minion/job.pm b/lib/Minion/Command/minion/job.pm | |
index e7935a5..5dcf0cc 100644 | |
--- a/lib/Minion/Command/minion/job.pm | |
+++ b/lib/Minion/Command/minion/job.pm | |
@@ -15,6 +15,7 @@ sub run { | |
GetOptionsFromArray \@args, | |
'A|attempts=i' => \$options->{attempts}, | |
'a|args=s' => sub { $args = decode_json($_[1]) }, | |
+ 'c|command=s' => (\my $command), | |
'd|delay=i' => \$options->{delay}, | |
'e|enqueue=s' => \my $enqueue, | |
'l|limit=i' => \(my $limit = 100), | |
@@ -30,6 +31,10 @@ sub run { | |
'w|workers' => \my $workers; | |
my $id = @args ? shift @args : undef; | |
+ # Worker remote control command | |
+ return $self->app->minion->backend->send_command($id, $command, $args) | |
+ if $command; | |
+ | |
# Enqueue | |
return say $self->app->minion->enqueue($enqueue, $args, $options) if $enqueue; | |
@@ -91,11 +96,14 @@ Minion::Command::minion::job - Minion job command | |
./myapp.pl minion job -e foo -P 10023 -P 10024 -p 5 -q important | |
./myapp.pl minion job -R -d 10 10023 | |
./myapp.pl minion job -r 10023 | |
+ ./myapp.pl minion job -c jobs -a '[12]' 23 | |
Options: | |
-A, --attempts <number> Number of times performing this new job will be | |
attempted, defaults to 1 | |
- -a, --args <JSON array> Arguments for new job in JSON format | |
+ -a, --args <JSON array> Arguments for new job or worker remote control | |
+ command in JSON format | |
+ -c, --command <name> Send worker remote control command | |
-d, --delay <seconds> Delay new job for this many seconds | |
-e, --enqueue <name> New job to be enqueued | |
-h, --help Show this summary of available options | |
diff --git a/lib/Minion/Command/minion/worker.pm b/lib/Minion/Command/minion/worker.pm | |
index 55363d5..14ddec1 100644 | |
--- a/lib/Minion/Command/minion/worker.pm | |
+++ b/lib/Minion/Command/minion/worker.pm | |
@@ -11,6 +11,7 @@ sub run { | |
my ($self, @args) = @_; | |
GetOptionsFromArray \@args, | |
+ 'C|command-interval=i' => \($self->{commands} = 10), | |
'I|heartbeat-interval=i' => \($self->{hearthbeat} = 60), | |
'j|jobs=i' => \($self->{max} = 4), | |
'q|queue=s' => \my @queues, | |
@@ -21,14 +22,12 @@ sub run { | |
local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ }; | |
local $SIG{QUIT} | |
= sub { ++$self->{finished} and kill 'KILL', keys %{$self->{jobs}} }; | |
- local $SIG{TTIN} = sub { $self->{max}++ }; | |
- local $SIG{TTOU} = sub { $self->{max}-- if $self->{max} > 0 }; | |
- local $SIG{USR1} = sub { $self->{max} = 0 }; | |
# Log fatal errors | |
my $app = $self->app; | |
$app->log->debug("Worker $$ started"); | |
my $worker = $self->{worker} = $app->minion->worker; | |
+ $worker->add_command(jobs => sub { $self->{max} = pop }); | |
eval { $self->_work until $self->{finished} && !keys %{$self->{jobs}}; 1 } | |
or $app->log->fatal("Worker error: $@"); | |
$worker->unregister; | |
@@ -42,6 +41,10 @@ sub _work { | |
$worker->register and $self->{register} = steady_time + $self->{hearthbeat} | |
if ($self->{register} || 0) < steady_time; | |
+ # Process worker remote control commands in regular intervals | |
+ $worker->process_commands and $self->{pc} = steady_time + $self->{commands} | |
+ if ($self->{pc} || 0) < steady_time; | |
+ | |
# Repair in regular intervals (randomize to avoid congestion) | |
if (($self->{check} || 0) < steady_time) { | |
my $app = $self->app; | |
@@ -80,10 +83,12 @@ Minion::Command::minion::worker - Minion worker command | |
Usage: APPLICATION minion worker [OPTIONS] | |
./myapp.pl minion worker | |
- ./myapp.pl minion worker -m production -I 15 -R 3600 -j 10 | |
+ ./myapp.pl minion worker -m production -I 15 -C 5 -R 3600 -j 10 | |
./myapp.pl minion worker -q important -q default | |
Options: | |
+ -C, --command-interval <seconds> Worker remote control command interval, | |
+ defaults to 10 | |
-h, --help Show this summary of available options | |
--home <path> Path to home directory of your | |
application, defaults to the value of | |
@@ -117,19 +122,18 @@ Stop gracefully after finishing the current jobs. | |
Stop immediately without finishing the current jobs. | |
-=head2 TTIN | |
- | |
-Increase the number of jobs to perform concurrently by one. | |
+=head1 REMOTE CONTROL COMMANDS | |
-=head2 TTOU | |
+The L<Minion::Command::minion::worker> process can be controlled at runtime | |
+with the following remote control commands. | |
-Decrease the number of jobs to perform concurrently by one. | |
+=head2 jobs | |
-=head2 USR1 | |
+ $ ./myapp.pl minion job -c jobs -a '[10]' 23 | |
-Pause the worker by setting the number of jobs to perform concurrently to zero. | |
-That means it will finish all current jobs, but not accept new ones, until the | |
-number is increased again with L</"TTIN">. | |
+Change the number of jobs to perform concurrently. Setting this value to C<0> | |
+will effectively pause the worker. That means all current jobs will be finished, | |
+but no new ones accepted, until the number is increased again. | |
=head1 ATTRIBUTES | |
diff --git a/lib/Minion/Worker.pm b/lib/Minion/Worker.pm | |
index db5eb80..793e37a 100644 | |
--- a/lib/Minion/Worker.pm | |
+++ b/lib/Minion/Worker.pm | |
@@ -1,8 +1,11 @@ | |
package Minion::Worker; | |
use Mojo::Base 'Mojo::EventEmitter'; | |
+has commands => sub { {} }; | |
has [qw(id minion)]; | |
+sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] } | |
+ | |
sub dequeue { | |
my ($self, $wait, $options) = @_; | |
@@ -24,6 +27,19 @@ sub dequeue { | |
sub info { $_[0]->minion->backend->worker_info($_[0]->id) } | |
+sub process_commands { | |
+ my $self = shift; | |
+ | |
+ my $commands = $self->minion->backend->receive_commands($self->id); | |
+ while (my $command = shift @$commands) { | |
+ my $name = shift @$command; | |
+ next unless my $cb = $self->commands->{$name}; | |
+ $self->$cb(@$command); | |
+ } | |
+ | |
+ return $self; | |
+} | |
+ | |
sub register { $_[0]->id($_[0]->minion->backend->register_worker($_[0]->id)) } | |
sub unregister { | |
@@ -74,6 +90,13 @@ Emitted in the worker process after a job has been dequeued. | |
L<Minion::Worker> implements the following attributes. | |
+=head2 commands | |
+ | |
+ my $commands = $worker->commands; | |
+ $worker = $worker->commands({jobs => sub {...}}); | |
+ | |
+Registered worker remote control commands. | |
+ | |
=head2 id | |
my $id = $worker->id; | |
@@ -93,6 +116,17 @@ L<Minion> object this worker belongs to. | |
L<Minion::Worker> inherits all methods from L<Mojo::EventEmitter> and | |
implements the following new ones. | |
+=head2 add_command | |
+ | |
+ $worker = $worker->add_command(jobs => sub {...}); | |
+ | |
+Register a worker remote control command. | |
+ | |
+ $worker->add_command(foo => sub { | |
+ my ($worker, @args) = @_; | |
+ ... | |
+ }); | |
+ | |
=head2 dequeue | |
my $job = $worker->dequeue(0.5); | |
@@ -159,6 +193,12 @@ Epoch time worker was started. | |
=back | |
+=head2 process_commands | |
+ | |
+ $worker = $worker->process_commands; | |
+ | |
+Process worker remote control commands. | |
+ | |
=head2 register | |
$worker = $worker->register; | |
diff --git a/t/pg.t b/t/pg.t | |
index 9341318..5d8cc15 100644 | |
--- a/t/pg.t | |
+++ b/t/pg.t | |
@@ -24,11 +24,11 @@ my $worker = $minion->repair->worker; | |
isa_ok $worker->minion->app, 'Mojolicious', 'has default application'; | |
# Migrate up and down | |
-is $minion->backend->pg->migrations->active, 11, 'active version is 11'; | |
+is $minion->backend->pg->migrations->active, 12, 'active version is 12'; | |
is $minion->backend->pg->migrations->migrate(0)->active, 0, | |
'active version is 0'; | |
-is $minion->backend->pg->migrations->migrate->active, 11, | |
- 'active version is 11'; | |
+is $minion->backend->pg->migrations->migrate->active, 12, | |
+ 'active version is 12'; | |
# Register and unregister | |
$worker->register; | |
@@ -641,6 +641,24 @@ is $minion->job($id)->info->{state}, 'failed', 'right state'; | |
is $minion->job($id)->info->{result}, 'Parent went away', 'right result'; | |
$worker->unregister; | |
+# Worker remote control commands | |
+$worker = $minion->worker->register; | |
+my @commands; | |
+$worker->add_command(test_id => sub { push @commands, shift->id })->register; | |
+$worker->add_command(test_args => sub { shift and push @commands, [@_] }); | |
+$minion->backend->send_command($worker->id, 'test_id', []); | |
+$worker->process_commands->register; | |
+is_deeply \@commands, [$worker->id], 'right structure'; | |
+@commands = (); | |
+$minion->backend->send_command($worker->id, 'test_id', []); | |
+$minion->backend->send_command($worker->id, 'test_whatever', []); | |
+$minion->backend->send_command($worker->id, 'test_args', | |
+ [1, [2], {3 => 'three'}]); | |
+$worker->process_commands; | |
+is_deeply \@commands, [$worker->id, [1, [2], {3 => 'three'}]], | |
+ 'right structure'; | |
+$worker->unregister; | |
+ | |
# Clean up once we are done | |
$pg->db->query('drop schema minion_test cascade'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment