Created
January 12, 2012 12:29
-
-
Save sebnow/1600230 to your computer and use it in GitHub Desktop.
ZeroMQ Worker Job Dispatch (Perl)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
=head1 NAME | |
dispatch.pl - Dispatch work to workers | |
=head1 SYNOPSIS | |
cat /proc/vmstat | perl dispatch.pl | |
=head1 DESCRIPTION | |
The script will read from standard input and send each line to a worker. | |
The worker processes the line and responds with the result. | |
=cut | |
use strict; | |
use warnings; | |
use AnyEvent; | |
use Fcntl; | |
use Getopt::Long; | |
use Log::Log4perl qw(:easy); | |
use ZeroMQ 0.20 qw(:all); # fixes passing flags to recv_as | |
Log::Log4perl->easy_init($TRACE); | |
my $log = Log::Log4perl->get_logger(); | |
my $listen_addr = "tcp://0.0.0.0:1331"; | |
my $report_addr = "tcp://0.0.0.0:1332"; | |
GetOptions( | |
'l|listen=s' => \$listen_addr, | |
'r|report=s' => \$report_addr | |
); | |
my $ctx = ZeroMQ::Context->new(); | |
my $dispatcher = $ctx->socket(ZMQ_PUSH); | |
$dispatcher->setsockopt(ZMQ_LINGER, 0); | |
$dispatcher->bind($listen_addr); | |
my $reporter = $ctx->socket(ZMQ_SUB); | |
$reporter->setsockopt(ZMQ_SUBSCRIBE, ""); | |
$reporter->bind($report_addr); | |
$log->info("Dispatching on $listen_addr"); | |
$log->info("Reporting on $report_addr"); | |
{ # Set STDIN to be non-blocking | |
my $flags = 0; | |
fcntl(STDIN, F_GETFL, $flags) | |
or die "Couldn't get flags for STDIN : $!\n"; | |
$flags |= O_NONBLOCK; | |
fcntl(STDIN, F_SETFL, $flags) | |
or die "Couldn't set flags for STDIN $!\n"; | |
} | |
my $unspool_w = AE::io(\*STDIN, 0, sub { | |
$log->debug("STDIN is ready to read"); | |
my $payload = <STDIN>; | |
chomp($payload); | |
$log->debug($payload || ''); | |
defined($payload) or return; | |
my $payload_cut = length($payload) > 24 | |
? substr($payload, 0, 21) . "..." | |
: $payload; | |
$log->debug("Sending \"$payload_cut\" to worker"); | |
$dispatcher->send_as('json' => {'message' => $payload}); | |
$log->trace("Sent!"); | |
}); | |
use JSON; | |
my $report_w = AE::io($reporter->getsockopt(ZMQ_FD), 0, sub { | |
$log->trace("Received response from reporter"); | |
if($reporter->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN) { | |
$log->trace("Ready to read"); | |
while(defined(my $payload = $reporter->recv_as('json' => ZMQ_NOBLOCK))) { | |
use Data::Dumper; | |
$log->debug(Dumper($payload)); | |
my $message = $payload->{'message'}; | |
my $message_cut = length($message) > 24 | |
? substr($message, 0, 21) . "..." | |
: $message; | |
$log->info("Message \"$message_cut\" has $payload->{length} characters"); | |
} | |
$log->trace("Finished reading all messages"); | |
} | |
}); | |
$log->trace("Entering main loop"); | |
AE::cv->recv(); | |
exit(0); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
=head1 NAME | |
worker.pl - Process work given by the dispatcher | |
=head1 SYNOPSIS | |
perl worker.pl | |
=head1 DESCRIPTION | |
The script will listen for messages from the dispatcher. It will count | |
the amount of characters in the payload and send back the result. | |
=cut | |
use strict; | |
use warnings; | |
use AnyEvent; | |
use Getopt::Long; | |
use Log::Log4perl qw(:easy); | |
use ZeroMQ qw(:all); | |
Log::Log4perl->easy_init($TRACE); | |
my $log = Log::Log4perl->get_logger(); | |
my $listen_addr = "tcp://0.0.0.0:1331"; | |
my $report_addr = "tcp://0.0.0.0:1332"; | |
GetOptions( | |
'l|listen=s' => \$listen_addr, | |
'r|report=s' => \$report_addr | |
); | |
my $ctx = ZeroMQ::Context->new(); | |
my $dispatcher = $ctx->socket(ZMQ_PULL); | |
$dispatcher->connect($listen_addr); | |
my $reporter = $ctx->socket(ZMQ_PUB); | |
$reporter->connect($report_addr); | |
$log->info("Listening on $listen_addr"); | |
$log->info("Reporting on $report_addr"); | |
my $work_w = AE::io($dispatcher->getsockopt(ZMQ_FD), 0, sub { | |
$log->trace("Received message from dispatcher"); | |
if($dispatcher->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN) { | |
$log->trace("Ready to read"); | |
while(my $payload = $dispatcher->recv_as('json', ZMQ_NOBLOCK)) { | |
my $message = $payload->{'message'}; | |
my $message_cut = length($message) > 24 | |
? substr($message, 0, 21) . "..." | |
: $message; | |
$log->debug("Received message \"$message_cut\" (" . length($message) . ")"); | |
$reporter->send_as('json' => {'message' => $message, 'length' => length($message)}); | |
} | |
} | |
}); | |
$log->trace("Entering main loop"); | |
AE::cv->recv(); | |
exit(0); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment