Skip to content

Instantly share code, notes, and snippets.

@sebnow
Created January 12, 2012 12:29
Show Gist options
  • Save sebnow/1600230 to your computer and use it in GitHub Desktop.
Save sebnow/1600230 to your computer and use it in GitHub Desktop.
ZeroMQ Worker Job Dispatch (Perl)
#!/usr/bin/perl
=head1 NAME
dispatch.pl - Dispatch work to workers
=head1 SYNOPSIS
cat /proc/vmstat | perl dispatch.pl
=head1 DESCRIPTION
The script will read from standard input and send each line to a worker.
The worker processes the line and responds with the result.
=cut
use strict;
use warnings;
use AnyEvent;
use Fcntl;
use Getopt::Long;
use Log::Log4perl qw(:easy);
use ZeroMQ 0.20 qw(:all); # fixes passing flags to recv_as
Log::Log4perl->easy_init($TRACE);
my $log = Log::Log4perl->get_logger();
my $listen_addr = "tcp://0.0.0.0:1331";
my $report_addr = "tcp://0.0.0.0:1332";
GetOptions(
'l|listen=s' => \$listen_addr,
'r|report=s' => \$report_addr
);
my $ctx = ZeroMQ::Context->new();
my $dispatcher = $ctx->socket(ZMQ_PUSH);
$dispatcher->setsockopt(ZMQ_LINGER, 0);
$dispatcher->bind($listen_addr);
my $reporter = $ctx->socket(ZMQ_SUB);
$reporter->setsockopt(ZMQ_SUBSCRIBE, "");
$reporter->bind($report_addr);
$log->info("Dispatching on $listen_addr");
$log->info("Reporting on $report_addr");
{ # Set STDIN to be non-blocking
my $flags = 0;
fcntl(STDIN, F_GETFL, $flags)
or die "Couldn't get flags for STDIN : $!\n";
$flags |= O_NONBLOCK;
fcntl(STDIN, F_SETFL, $flags)
or die "Couldn't set flags for STDIN $!\n";
}
my $unspool_w = AE::io(\*STDIN, 0, sub {
$log->debug("STDIN is ready to read");
my $payload = <STDIN>;
chomp($payload);
$log->debug($payload || '');
defined($payload) or return;
my $payload_cut = length($payload) > 24
? substr($payload, 0, 21) . "..."
: $payload;
$log->debug("Sending \"$payload_cut\" to worker");
$dispatcher->send_as('json' => {'message' => $payload});
$log->trace("Sent!");
});
use JSON;
my $report_w = AE::io($reporter->getsockopt(ZMQ_FD), 0, sub {
$log->trace("Received response from reporter");
if($reporter->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN) {
$log->trace("Ready to read");
while(defined(my $payload = $reporter->recv_as('json' => ZMQ_NOBLOCK))) {
use Data::Dumper;
$log->debug(Dumper($payload));
my $message = $payload->{'message'};
my $message_cut = length($message) > 24
? substr($message, 0, 21) . "..."
: $message;
$log->info("Message \"$message_cut\" has $payload->{length} characters");
}
$log->trace("Finished reading all messages");
}
});
$log->trace("Entering main loop");
AE::cv->recv();
exit(0);
#!/usr/bin/perl
=head1 NAME
worker.pl - Process work given by the dispatcher
=head1 SYNOPSIS
perl worker.pl
=head1 DESCRIPTION
The script will listen for messages from the dispatcher. It will count
the amount of characters in the payload and send back the result.
=cut
use strict;
use warnings;
use AnyEvent;
use Getopt::Long;
use Log::Log4perl qw(:easy);
use ZeroMQ qw(:all);
Log::Log4perl->easy_init($TRACE);
my $log = Log::Log4perl->get_logger();
my $listen_addr = "tcp://0.0.0.0:1331";
my $report_addr = "tcp://0.0.0.0:1332";
GetOptions(
'l|listen=s' => \$listen_addr,
'r|report=s' => \$report_addr
);
my $ctx = ZeroMQ::Context->new();
my $dispatcher = $ctx->socket(ZMQ_PULL);
$dispatcher->connect($listen_addr);
my $reporter = $ctx->socket(ZMQ_PUB);
$reporter->connect($report_addr);
$log->info("Listening on $listen_addr");
$log->info("Reporting on $report_addr");
my $work_w = AE::io($dispatcher->getsockopt(ZMQ_FD), 0, sub {
$log->trace("Received message from dispatcher");
if($dispatcher->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN) {
$log->trace("Ready to read");
while(my $payload = $dispatcher->recv_as('json', ZMQ_NOBLOCK)) {
my $message = $payload->{'message'};
my $message_cut = length($message) > 24
? substr($message, 0, 21) . "..."
: $message;
$log->debug("Received message \"$message_cut\" (" . length($message) . ")");
$reporter->send_as('json' => {'message' => $message, 'length' => length($message)});
}
}
});
$log->trace("Entering main loop");
AE::cv->recv();
exit(0);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment