Skip to content

Instantly share code, notes, and snippets.

@ggl
Last active March 11, 2019 13:24
Show Gist options
  • Save ggl/326d6d01fdae2c91be2d19cb7f26dbf5 to your computer and use it in GitHub Desktop.
Save ggl/326d6d01fdae2c91be2d19cb7f26dbf5 to your computer and use it in GitHub Desktop.
Mojo::IOLoop::Subproces task runner
#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use Mojo::Log;
use Mojo::IOLoop;
use Mojo::IOLoop::Subprocess;
my %procs;
my %tasks;
my %pending;
my @queue = qw(1 2 3 4 5 6 7 8 9);
my $max_child = 3;
my $poll_cycle = 10;
my $log = Mojo::Log->new(handle => *STDERR, level => 'debug')->info("Worker started, pid: $$");
Mojo::IOLoop->recurring($poll_cycle => sub {
my $loop = shift;
$loop->delay(
sub {
my $delay = shift;
my $active_pids = scalar keys %procs;
$log->info('Tasks queued: '.(scalar(@queue) - $active_pids).', active: '.$active_pids.'/'.$max_child);
my $i = 0;
foreach my $task (@queue) {
## skip pending tasks
next if $pending{$task};
## skip active tasks
next if $tasks{$task};
my $pids = scalar keys %procs;
if (($pids < $max_child) and ($i < $max_child)) {
## run queued tasks in order
## guard against running queued tasks multiple times when max_child >= poll_cycle
## by saving pending tasks in a hash and clearing them once they become active
$pending{$task} ||= time;
Mojo::IOLoop->timer(++$i => sub {
$pids = scalar keys %procs;
if ($pids < $max_child) {
$log->info("New task: $task");
my $proc = Mojo::IOLoop::Subprocess->new;
$proc->run(sub {
my $sp = shift;
$log->debug('Spawned child pid: '.$sp->pid.', $$: '.$$);
$sp->progress($task);
eval {
sleep(int(rand(30)));
};
if ($@) {
$log->error("Task $task has failed", $@);
};
return $task;
},
sub {
my ($sp, $err, $task) = @_;
$log->debug('Parent pid: '.$sp->pid.', $$: '.$$);
if ($err) {
$log->error($err);
};
if ((my $pid = $sp->pid) and $task) {
kill(9, $pid);
if (!kill(1, $pid)) {
delete $procs{$pid};
delete $tasks{$task};
delete $pending{$task};
$log->info("Task ".$task." done.");
@queue = grep { $_ != $task } @queue;
};
};
return 1;
});
$proc->on(progress => sub {
my ($sp, $task) = @_;
$log->debug('pid: '.$sp->pid.', task: '.$task);
$procs{$sp->pid} = $sp;
$tasks{$task} = $sp->pid;
});
$log->debug(Dumper [keys %procs], \%tasks);
if (my $pid = $proc->pid) {
$procs{$pid} = $proc;
$tasks{$task} = $pid;
};
}
else {
delete $pending{$task};
};
});
};
};
},
);
});
## start event loop
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment