Skip to content

Instantly share code, notes, and snippets.

@stevan
Created April 11, 2011 02:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stevan/912985 to your computer and use it in GitHub Desktop.
Save stevan/912985 to your computer and use it in GitHub Desktop.
#!perl
use 5.10.0;
use strict;
use warnings;
use Data::Dumper;
use ZeroMQ qw/:all/;
my $coord_addr = 'tcp://127.0.0.1:6666';
my $publisher_addr = 'tcp://127.0.0.1:7777';
my $cxt = ZeroMQ::Context->new;
say "welcome to my client ...";
my $coordinator = $cxt->socket(ZMQ_REQ);
$coordinator->connect($coord_addr);
say "coordinator connected to $coord_addr";
my $subscriber = $cxt->socket(ZMQ_SUB);
$subscriber->connect($publisher_addr);
say "subscriber bound to $publisher_addr";
while (1) {
print "? ";
my $input = <STDIN>;
chomp $input;
say "=> sending input=($input) to server";
$coordinator->send($input);
my $uuid = $coordinator->recv->data;
say "<= got back " . $uuid . " from server";
$subscriber->setsockopt(ZMQ_SUBSCRIBE, $uuid);
say "starting subscription ...";
my $data = $subscriber->recv->data;
say "got $data";
$data =~ s/^$uuid //;
while ( $data ) {
$data = $subscriber->recv->data;
say "got $data";
$data =~ s/^$uuid //;
}
}
#!perl
use 5.10.0;
use strict;
use warnings;
use Data::Dumper;
use Data::UUID;
use ZeroMQ qw/:all/;
use ZeroMQ::Raw qw/zmq_device/;
my @subscription_ports = @ARGV;
my $backend_addr = 'tcp://127.0.0.1:5555';
my $frontend_addr = 'tcp://127.0.0.1:6666';
my $publisher_addr = 'tcp://127.0.0.1:7777';
say "Welcome to my server ....";
my $cxt = ZeroMQ::Context->new;
my $frontend = $cxt->socket(ZMQ_XREP);
$frontend->bind($frontend_addr);
say "front-end bound to $frontend_addr";
my $backend = $cxt->socket(ZMQ_XREQ);
$backend->bind($backend_addr);
say "back-end bound to $backend_addr";
my $publisher = $cxt->socket(ZMQ_PUB);
$publisher->bind($publisher_addr);
say "publisher bound to $publisher_addr";
my @subscribers;
foreach my $port ( @subscription_ports ) {
my $addr = 'tcp://127.0.0.1:' . $port;
my $subscriber = $cxt->socket(ZMQ_SUB);
$subscriber->connect($addr);
say "subscriber connected to $addr";
$subscriber->setsockopt(ZMQ_SUBSCRIBE, '');
push @subscribers => +{
id => $port,
socket => $subscriber,
};
}
my $poller = ZeroMQ::Poller->new(
{
name => 'frontend',
socket => $frontend,
events => ZMQ_POLLIN,
},
{
name => 'backend',
socket => $backend,
events => ZMQ_POLLIN,
},
map {
+{
name => 'subscriber:' . $_->{id},
socket => $_->{socket},
events => ZMQ_POLLIN,
}
} @subscribers
);
while (1) {
say "polling ...";
$poller->poll();
say "... polled";
if ($poller->has_event('frontend')) {
while (1) {
# Process all parts of the message
my $message = $frontend->recv;
my $more = $frontend->getsockopt(ZMQ_RCVMORE);
$backend->send($message, $more ? ZMQ_SNDMORE : 0);
last unless $more;
}
}
if ($poller->has_event('backend')) {
while (1) {
# Process all parts of the message
my $message = $backend->recv;
my $more = $backend->getsockopt(ZMQ_RCVMORE);
$frontend->send($message, $more ? ZMQ_SNDMORE : 0);
last unless $more;
}
}
foreach my $subscriber ( @subscribers ) {
if ($poller->has_event('subscriber:' . $subscriber->{id})) {
say "Got event on subscriber:" . $subscriber->{id};
while (1) {
my $message = $subscriber->{socket}->recv;
my $more = $subscriber->{socket}->getsockopt(ZMQ_RCVMORE);
$publisher->send($message, $more ? ZMQ_SNDMORE : 0);
last unless $more;
}
}
}
}
#!perl
use 5.10.0;
use strict;
use warnings;
use Data::UUID;
use ZeroMQ qw/:all/;
my $publisher_port = shift;
my $coord_addr = 'tcp://127.0.0.1:5555';
my $publisher_addr = 'tcp://127.0.0.1:' . $publisher_port;
say "Welcome to the worker ....";
my $cxt = ZeroMQ::Context->new;
my $coordinator = $cxt->socket(ZMQ_REP);
$coordinator->connect($coord_addr);
say "coordinator connected to $coord_addr";
my $publisher = $cxt->socket(ZMQ_PUB);
$publisher->bind($publisher_addr);
say "publisher bound to $publisher_addr";
while (1) {
my $msg = $coordinator->recv;
my $value = $msg->data;
say "<= got value=($value) on coordinator ...";
my $uuid = Data::UUID->new->create_str;
say "=> sending uuid=($uuid)";
$coordinator->send( $uuid );
# now publish ...
foreach my $i ( 0 .. 1000 ) {
my $msg = "$uuid [ $i ]";
say "sending $msg";
$publisher->send($msg);
}
say "publishing completed, sending empty value to subscriber ...";
$publisher->send("$uuid ");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment