Skip to content

Instantly share code, notes, and snippets.

@shoorick
Forked from knutov/gist:78b7043ee6ea4f345f6da55670e65582
Created September 18, 2018 19:10
Show Gist options
  • Save shoorick/d8efeade2c47e4a2cd4eb0c1f1b71579 to your computer and use it in GitHub Desktop.
Save shoorick/d8efeade2c47e4a2cd4eb0c1f1b71579 to your computer and use it in GitHub Desktop.
mojolicious async external command execution using Mojo::IOLoop
#!/usr/bin/perl
# $Id: receiver_manage.pl 1977 2010-10-14 09:12:15Z bfg $
# $Date: 2010-10-14 11:12:15 +0200 (Thu, 14 Oct 2010) $
# $Author: bfg $
# $Revision: 1977 $
# $LastChangedRevision: 1977 $
# $LastChangedBy: bfg $
# $LastChangedDate: 2010-10-14 11:12:15 +0200 (Thu, 14 Oct 2010) $
# $URL: https://svn.interseek.com/repositories/admin/misc/cic/receiver_manage.pl $
use strict;
use warnings;
# Make reloading work
BEGIN { $INC{$0} = $0 }
use FindBin;
use IO::File;
use IPC::Open3;
use Mojo::IOLoop;
use Mojolicious::Lite;
use POSIX ":sys_wait_h";
use Time::HiRes qw(time);
my $_bin_dir = $FindBin::RealBin;
$ENV{PATH} .= ":" . $_bin_dir;
my $Error = '';
my $loop = Mojo::IOLoop->singleton();
# cleanup dead zombie processes...
# i didn't found any other way to do this properly...
#$loop->tick_cb(\ &child_reaper);
$loop->recurring(1 => \&child_reaper);
####################################################
# FUNCTIONS #
####################################################
# external process reaper...
sub child_reaper {
while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
app->log->debug("Reaped child pid: $pid");
}
}
# returns full command to spawn according to target
# and host
sub command_get {
# command to spawn
my $cmd = 'cat /dev/urandom | head -c 1G';
return $cmd;
}
sub command_start {
my ($cmd) = @_;
my $stdin = IO::Handle->new();
my $stdout = IO::Handle->new();
my $stderr = IO::Handle->new();
app->log->debug("starting: $cmd");
my $pid = undef;
eval { $pid = open3($stdin, $stdout, $stderr, $cmd) };
if ($@) {
$Error = "Exception while starting command '$cmd': $@";
return (undef, undef);
}
unless (defined $pid && $pid > 0) {
$Error = "Error starting external command: $!";
return (undef, undef);
}
app->log->debug("Program started as pid $pid.");
# make handles non-blocking...
$stdout->blocking(0);
$stderr->blocking(0);
return ($stdout, $stderr, $pid);
}
# this method is called as on_read() handler from $loop
sub async_read {
my ($self, $loop, $id, $chunk, $type, $data, $pid) = @_;
app->log->debug("output($type) [loop=$loop, id=$id]: '$chunk'");
# append to acc buffer
$data->{$type} .= $chunk;
my $duration = time() - $data->{time_start};
if (defined $pid && $pid > 0) {
# is process still alive?
if (kill(0, $pid)) {
app->log->debug("output $type: external command $pid is alive.");
# WARNING: we need to kill subprocess, otherwise process
# stays live forever...
kill(15, $pid);
}
# probably this is right...
return render_output($self, $data);
}
}
# renders fatal rest error message
sub render_err {
my ($self, $data) = @_;
my $res = $self->res();
$res->headers->header('Cache-Control', 'no-cache; max-age=0');
$res->code(503);
return $self->render('json' => $data);
}
sub render_output {
my ($self, $s) = @_;
my $len_stdout = length($s->{stdout});
my $len_stderr = length($s->{stderr});
app->log->debug(" Read $len_stdout bytes of stdout and $len_stderr bytes of stderr.");
# do we have anything on stdout?
if ($len_stdout > 0) {
# stdout output is perl eval() compatible;
# convert stdout string to perl hashref...
#my $struct = eval $s->{stdout};
#if ($@) {
#return render_err(
#$self,
#{ ok => 0, error => "Error evaluating external command output: $@"}
#);
#}
# we have valid output, YAY!!!
#return $self->render('json' => $struct);
return $self->render('data' => $s->{stdout});
} else {
# nope... this is completely fucked up.
return render_err(
$self,
{ ok => 0, error => "No data in stdout, external command exit status; stderr: $s->{stderr}"}
);
}
}
sub async_read_error {
my ($loop, $id) = @_;
app->log->debug("I/O hangup: loop=$loop; handle=$id");
# run process reaper...
child_reaper();
}
####################################################
# URL HANDLERS #
####################################################
get '/rest/:host/:target' => [
host => qr/[\w\.\-]+/,
target => qr/[a-z_]+/
], => sub {
my ($self) = @_;
my $host = $self->param('host');
my $target = $self->param('target');
my $cmd = command_get($target, $host);
#my $cmd = "/tmp/test.sh";
if (! defined $cmd) {
return render_err(
$self,
{ ok => 0, error => $Error}
);
}
# data structure (holding stdout/stderr && stuff...)
my $s = {
time_start => time(),
stdout => '',
stderr => '',
exit_status => 1,
};
# start external command get filehandles...
my ($stdout, $stderr, $pid) = command_start($cmd);
unless (defined $stdout && defined $stderr) {
return render_err(
$self,
{ ok => 0, error => $Error}
);
}
# add stdout to ioloop...
my $id_stdout = $loop->client(
socket => $stdout,
on_read => sub { async_read($self, @_, 'stdout', $s, $pid); },
on_error => sub { async_read_error($self, 'stdout') },
on_hup => sub { async_read_error($self, 'stdout') },
);
# add stderr to ioloop
my $id_stderr = $loop->client(
socket => $stderr,
on_read => sub { async_read($self, @_, 'stderr', $s, $pid); },
on_error => sub { async_read_error($self, 'stderr') },
on_hup => sub { async_read_error($self, 'stderr') },
);
# this is it...
};
get '/' => sub {} => "/index";
# Start the Mojolicious command system
#app->secrets([rand()]);
app->start;
# EOF
__DATA__
@@ index.html.ep
Coming soon
@@ resthosttarget.html.ep
Long process is working...
#!/usr/bin/perl
use Mojolicious::Lite;
use POSIX ":sys_wait_h";
any '/:size' => [ 'size' => qr/\d+[KMGTPEZY]?/i ] => sub {
my $self = shift;
my $size = $self->param('size');
$self->res->headers->header('Content-Disposition' => qq{attachment; filename="$size.txt"});
$self->render_later;
my $pid = open(my $fh, "head -c $size /dev/urandom |")
or $self->app->log->error('Cannot get data');
$self->app->log->debug("PID $pid");
my $chunk;
my $CHUNK_SIZE = 1<<20; # randomly chosen
# Start writing directly with a drain callback
my $drain;
$drain = sub {
my $self = shift;
read $fh, $chunk, $CHUNK_SIZE;
length $chunk
? $self->write($chunk, $drain)
: $self->finish;
};
$self->$drain;
# Terminate finished child processes
$self->on(finish => sub {
$self->app->log->debug("Time to kill process $pid");
while ((my $kid = waitpid($pid, WNOHANG)) > 0) {
app->log->debug("Process $pid ended");
}
# kill 'KILL', $pid; # Does not work
});
};
any '/' => 'index';
app->start;
__DATA__
@@ index.html.ep
<h2>Usage</h2>
Go to <code>host:port/<b>Size</b>Unit</code>
<br>where <code>Size</code> is non-negative integer,
<code>Unit</code> is a letter, one of b, K, M, G, T, P, E, Z, Y.
<h2>Examples</h2>
<ul>
<li><a href="<%= url_for('/1K') %>">1K</a></li>
<li><a href="<%= url_for('/2M') %>">2M</a></li>
<li><a href="<%= url_for('/3G') %>">3G</a></li>
</ul>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment