Skip to content

Instantly share code, notes, and snippets.

/wrcc.diff Secret

Created September 16, 2016 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/0aaa8480030530aaf854dba405a0dbc3 to your computer and use it in GitHub Desktop.
Save anonymous/0aaa8480030530aaf854dba405a0dbc3 to your computer and use it in GitHub Desktop.
diff --git a/Changes b/Changes
index 1115341..c86889d 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,11 @@
-5.10 2016-09-14
+6.0 2016-09-16
+ - Removed TTIN, TTOU and USR1 signals from worker command.
+ - Added support for worker remote control commands.
+ - Added commands attribute to Minion::Worker.
+ - Added add_command and process_commands methods to Minion::Worker.
+ - Added receive_commands and send_command methods to Minion::Backend::Pg.
+ - Improved worker command with support for jobs remote control command.
5.09 2016-08-31
- Added EXPERIMENTAL enqueued_jobs field to stats methods in Minion and
diff --git a/lib/Minion.pm b/lib/Minion.pm
index ead13d4..7710fff 100644
--- a/lib/Minion.pm
+++ b/lib/Minion.pm
@@ -15,7 +15,7 @@ has missing_after => 1800;
has remove_after => 172800;
has tasks => sub { {} };
-our $VERSION = '5.10';
+our $VERSION = '6.0';
sub add_task { ($_[0]->tasks->{$_[1]} = $_[2]) and return $_[0] }
diff --git a/lib/Minion/Backend.pm b/lib/Minion/Backend.pm
index bdea6fd..32c22cc 100644
--- a/lib/Minion/Backend.pm
+++ b/lib/Minion/Backend.pm
@@ -13,15 +13,20 @@ sub job_info { croak 'Method "job_info" not implemented by subclass' }
sub list_jobs { croak 'Method "list_jobs" not implemented by subclass' }
sub list_workers { croak 'Method "list_workers" not implemented by subclass' }
+sub receive_commands {
+ croak 'Method "receive_commands" not implemented by subclass';
+}
+
sub register_worker {
croak 'Method "register_worker" not implemented by subclass';
}
-sub remove_job { croak 'Method "remove_job" not implemented by subclass' }
-sub repair { croak 'Method "repair" not implemented by subclass' }
-sub reset { croak 'Method "reset" not implemented by subclass' }
-sub retry_job { croak 'Method "retry_job" not implemented by subclass' }
-sub stats { croak 'Method "stats" not implemented by subclass' }
+sub remove_job { croak 'Method "remove_job" not implemented by subclass' }
+sub repair { croak 'Method "repair" not implemented by subclass' }
+sub reset { croak 'Method "reset" not implemented by subclass' }
+sub retry_job { croak 'Method "retry_job" not implemented by subclass' }
+sub send_command { croak 'Method "send_command" not implemented by subclass' }
+sub stats { croak 'Method "stats" not implemented by subclass' }
sub unregister_worker {
croak 'Method "unregister_worker" not implemented by subclass';
@@ -49,11 +54,13 @@ Minion::Backend - Backend base class
sub job_info {...}
sub list_jobs {...}
sub list_workers {...}
+ sub receive_commands {...}
sub register_worker {...}
sub remove_job {...}
sub repair {...}
sub reset {...}
sub retry_job {...}
+ sub send_command {...}
sub stats {...}
sub unregister_worker {...}
sub worker_info {...}
@@ -351,6 +358,12 @@ List only jobs for this task.
Returns the same information as L</"worker_info"> but in batches. Meant to be
overloaded in a subclass.
+=head2 receive_commands
+
+ my $commands = $backend->receive_commands($worker_id);
+
+Receive worker remote control commands.
+
=head2 register_worker
my $worker_id = $backend->register_worker;
@@ -412,6 +425,12 @@ Queue to put job in.
=back
+=head2 send_command
+
+ my $bool = $backend->send_command($worker_id, 'some_command', [@args]);
+
+Send worker remote control command.
+
=head2 stats
my $stats = $backend->stats;
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm
index b1b36df..51c0d86 100644
--- a/lib/Minion/Backend/Pg.pm
+++ b/lib/Minion/Backend/Pg.pm
@@ -85,6 +85,15 @@ sub new {
return $self;
}
+sub receive_commands {
+ shift->pg->db->query(
+ "update minion_workers as new set inbox = '[]'
+ from (select id, inbox from minion_workers where id = ? for update) as old
+ where new.id = old.id and old.inbox != '[]'
+ returning old.inbox", shift
+ )->expand->array->[0];
+}
+
sub register_worker {
my ($self, $id) = @_;
@@ -162,6 +171,13 @@ sub retry_job {
)->rows;
}
+sub send_command {
+ !!shift->pg->db->query(
+ 'update minion_workers set inbox = inbox || $2::jsonb
+ where id = $1', shift, {json => [[shift, @{shift()}]]}
+ )->rows;
+}
+
sub stats {
my $self = shift;
@@ -549,6 +565,12 @@ Returns the same information as L</"worker_info"> but in batches.
Construct a new L<Minion::Backend::Pg> object.
+=head2 receive_commands
+
+ my $commands = $backend->receive_commands($worker_id);
+
+Receive worker remote control commands.
+
=head2 register_worker
my $worker_id = $backend->register_worker;
@@ -606,6 +628,12 @@ Queue to put job in.
=back
+=head2 send_command
+
+ my $bool = $backend->send_command($worker_id, 'some_command', [@args]);
+
+Send worker remote control command.
+
=head2 stats
my $stats = $backend->stats;
@@ -812,3 +840,7 @@ alter table minion_jobs add column parents bigint[] default '{}'::bigint[];
-- 11 up
create index on minion_jobs (state, priority desc, id);
+
+-- 12 up
+alter table minion_workers add column inbox jsonb
+ check(jsonb_typeof(inbox) = 'array') default '[]';
diff --git a/lib/Minion/Command/minion/job.pm b/lib/Minion/Command/minion/job.pm
index e7935a5..5dcf0cc 100644
--- a/lib/Minion/Command/minion/job.pm
+++ b/lib/Minion/Command/minion/job.pm
@@ -15,6 +15,7 @@ sub run {
GetOptionsFromArray \@args,
'A|attempts=i' => \$options->{attempts},
'a|args=s' => sub { $args = decode_json($_[1]) },
+ 'c|command=s' => (\my $command),
'd|delay=i' => \$options->{delay},
'e|enqueue=s' => \my $enqueue,
'l|limit=i' => \(my $limit = 100),
@@ -30,6 +31,10 @@ sub run {
'w|workers' => \my $workers;
my $id = @args ? shift @args : undef;
+ # Worker remote control command
+ return $self->app->minion->backend->send_command($id, $command, $args)
+ if $command;
+
# Enqueue
return say $self->app->minion->enqueue($enqueue, $args, $options) if $enqueue;
@@ -91,11 +96,14 @@ Minion::Command::minion::job - Minion job command
./myapp.pl minion job -e foo -P 10023 -P 10024 -p 5 -q important
./myapp.pl minion job -R -d 10 10023
./myapp.pl minion job -r 10023
+ ./myapp.pl minion job -c jobs -a '[12]' 23
Options:
-A, --attempts <number> Number of times performing this new job will be
attempted, defaults to 1
- -a, --args <JSON array> Arguments for new job in JSON format
+ -a, --args <JSON array> Arguments for new job or worker remote control
+ command in JSON format
+ -c, --command <name> Send worker remote control command
-d, --delay <seconds> Delay new job for this many seconds
-e, --enqueue <name> New job to be enqueued
-h, --help Show this summary of available options
diff --git a/lib/Minion/Command/minion/worker.pm b/lib/Minion/Command/minion/worker.pm
index 55363d5..14ddec1 100644
--- a/lib/Minion/Command/minion/worker.pm
+++ b/lib/Minion/Command/minion/worker.pm
@@ -11,6 +11,7 @@ sub run {
my ($self, @args) = @_;
GetOptionsFromArray \@args,
+ 'C|command-interval=i' => \($self->{commands} = 10),
'I|heartbeat-interval=i' => \($self->{hearthbeat} = 60),
'j|jobs=i' => \($self->{max} = 4),
'q|queue=s' => \my @queues,
@@ -21,14 +22,12 @@ sub run {
local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ };
local $SIG{QUIT}
= sub { ++$self->{finished} and kill 'KILL', keys %{$self->{jobs}} };
- local $SIG{TTIN} = sub { $self->{max}++ };
- local $SIG{TTOU} = sub { $self->{max}-- if $self->{max} > 0 };
- local $SIG{USR1} = sub { $self->{max} = 0 };
# Log fatal errors
my $app = $self->app;
$app->log->debug("Worker $$ started");
my $worker = $self->{worker} = $app->minion->worker;
+ $worker->add_command(jobs => sub { $self->{max} = pop });
eval { $self->_work until $self->{finished} && !keys %{$self->{jobs}}; 1 }
or $app->log->fatal("Worker error: $@");
$worker->unregister;
@@ -42,6 +41,10 @@ sub _work {
$worker->register and $self->{register} = steady_time + $self->{hearthbeat}
if ($self->{register} || 0) < steady_time;
+ # Process worker remote control commands in regular intervals
+ $worker->process_commands and $self->{pc} = steady_time + $self->{commands}
+ if ($self->{pc} || 0) < steady_time;
+
# Repair in regular intervals (randomize to avoid congestion)
if (($self->{check} || 0) < steady_time) {
my $app = $self->app;
@@ -80,10 +83,12 @@ Minion::Command::minion::worker - Minion worker command
Usage: APPLICATION minion worker [OPTIONS]
./myapp.pl minion worker
- ./myapp.pl minion worker -m production -I 15 -R 3600 -j 10
+ ./myapp.pl minion worker -m production -I 15 -C 5 -R 3600 -j 10
./myapp.pl minion worker -q important -q default
Options:
+ -C, --command-interval <seconds> Worker remote control command interval,
+ defaults to 10
-h, --help Show this summary of available options
--home <path> Path to home directory of your
application, defaults to the value of
@@ -117,19 +122,18 @@ Stop gracefully after finishing the current jobs.
Stop immediately without finishing the current jobs.
-=head2 TTIN
-
-Increase the number of jobs to perform concurrently by one.
+=head1 REMOTE CONTROL COMMANDS
-=head2 TTOU
+The L<Minion::Command::minion::worker> process can be controlled at runtime
+with the following remote control commands.
-Decrease the number of jobs to perform concurrently by one.
+=head2 jobs
-=head2 USR1
+ $ ./myapp.pl minion job -c jobs -a '[10]' 23
-Pause the worker by setting the number of jobs to perform concurrently to zero.
-That means it will finish all current jobs, but not accept new ones, until the
-number is increased again with L</"TTIN">.
+Change the number of jobs to perform concurrently. Setting this value to C<0>
+will effectively pause the worker. That means all current jobs will be finished,
+but no new ones accepted, until the number is increased again.
=head1 ATTRIBUTES
diff --git a/lib/Minion/Worker.pm b/lib/Minion/Worker.pm
index db5eb80..793e37a 100644
--- a/lib/Minion/Worker.pm
+++ b/lib/Minion/Worker.pm
@@ -1,8 +1,11 @@
package Minion::Worker;
use Mojo::Base 'Mojo::EventEmitter';
+has commands => sub { {} };
has [qw(id minion)];
+sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] }
+
sub dequeue {
my ($self, $wait, $options) = @_;
@@ -24,6 +27,19 @@ sub dequeue {
sub info { $_[0]->minion->backend->worker_info($_[0]->id) }
+sub process_commands {
+ my $self = shift;
+
+ my $commands = $self->minion->backend->receive_commands($self->id);
+ while (my $command = shift @$commands) {
+ my $name = shift @$command;
+ next unless my $cb = $self->commands->{$name};
+ $self->$cb(@$command);
+ }
+
+ return $self;
+}
+
sub register { $_[0]->id($_[0]->minion->backend->register_worker($_[0]->id)) }
sub unregister {
@@ -74,6 +90,13 @@ Emitted in the worker process after a job has been dequeued.
L<Minion::Worker> implements the following attributes.
+=head2 commands
+
+ my $commands = $worker->commands;
+ $worker = $worker->commands({jobs => sub {...}});
+
+Registered worker remote control commands.
+
=head2 id
my $id = $worker->id;
@@ -93,6 +116,17 @@ L<Minion> object this worker belongs to.
L<Minion::Worker> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
+=head2 add_command
+
+ $worker = $worker->add_command(jobs => sub {...});
+
+Register a worker remote control command.
+
+ $worker->add_command(foo => sub {
+ my ($worker, @args) = @_;
+ ...
+ });
+
=head2 dequeue
my $job = $worker->dequeue(0.5);
@@ -159,6 +193,12 @@ Epoch time worker was started.
=back
+=head2 process_commands
+
+ $worker = $worker->process_commands;
+
+Process worker remote control commands.
+
=head2 register
$worker = $worker->register;
diff --git a/t/pg.t b/t/pg.t
index 9341318..5d8cc15 100644
--- a/t/pg.t
+++ b/t/pg.t
@@ -24,11 +24,11 @@ my $worker = $minion->repair->worker;
isa_ok $worker->minion->app, 'Mojolicious', 'has default application';
# Migrate up and down
-is $minion->backend->pg->migrations->active, 11, 'active version is 11';
+is $minion->backend->pg->migrations->active, 12, 'active version is 12';
is $minion->backend->pg->migrations->migrate(0)->active, 0,
'active version is 0';
-is $minion->backend->pg->migrations->migrate->active, 11,
- 'active version is 11';
+is $minion->backend->pg->migrations->migrate->active, 12,
+ 'active version is 12';
# Register and unregister
$worker->register;
@@ -641,6 +641,24 @@ is $minion->job($id)->info->{state}, 'failed', 'right state';
is $minion->job($id)->info->{result}, 'Parent went away', 'right result';
$worker->unregister;
+# Worker remote control commands
+$worker = $minion->worker->register;
+my @commands;
+$worker->add_command(test_id => sub { push @commands, shift->id })->register;
+$worker->add_command(test_args => sub { shift and push @commands, [@_] });
+$minion->backend->send_command($worker->id, 'test_id', []);
+$worker->process_commands->register;
+is_deeply \@commands, [$worker->id], 'right structure';
+@commands = ();
+$minion->backend->send_command($worker->id, 'test_id', []);
+$minion->backend->send_command($worker->id, 'test_whatever', []);
+$minion->backend->send_command($worker->id, 'test_args',
+ [1, [2], {3 => 'three'}]);
+$worker->process_commands;
+is_deeply \@commands, [$worker->id, [1, [2], {3 => 'three'}]],
+ 'right structure';
+$worker->unregister;
+
# Clean up once we are done
$pg->db->query('drop schema minion_test cascade');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment