Skip to content

Instantly share code, notes, and snippets.

@stevan
Created April 7, 2011 22:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stevan/908941 to your computer and use it in GitHub Desktop.
Save stevan/908941 to your computer and use it in GitHub Desktop.
#!perl
use 5.10.0;
use strict;
use warnings;
use Data::Dumper;
use ZeroMQ qw/:all/;
my $coord_addr = 'tcp://127.0.0.1:5566';
my $publisher_addr = 'tcp://127.0.0.1:5567';
my $cxt = ZeroMQ::Context->new;
say "welcome to my client ...";
my $coordinator = $cxt->socket(ZMQ_REQ);
$coordinator->connect($coord_addr);
say "coordinator bound to $coord_addr";
my $subscriber = $cxt->socket(ZMQ_SUB);
$subscriber->connect($publisher_addr);
say "subscriber bound to $publisher_addr";
while (1) {
print "enter a command:";
my $command = <>;
chomp $command;
say "sending $command to server";
$coordinator->send($command);
say "waiting for subscription filter ...";
my $filter = $coordinator->recv->data;
say "Got back subscription filter=($filter)";
if ($filter eq lc 'unknown command') {
say "Got 'unknown command' error";
next;
}
$subscriber->setsockopt(ZMQ_SUBSCRIBE, $filter);
say "subscription filter applied to subscriber";
say "sending acknowledgement to coordinator to start";
$coordinator->send('');
say "starting subscription ...";
my $data = $subscriber->recv->data;
say "got $data";
$data =~ s/^$filter //;
while ( $data ) {
$data = $subscriber->recv->data;
say "got $data";
$data =~ s/^$filter //;
sleep(1);
}
say "subscription done ...";
say "grabbing acknowledgement from coordinator that subscription is complete";
$coordinator->recv;
}
#!perl
use 5.10.0;
use strict;
use warnings;
use Data::Dumper;
use Data::UUID;
use ZeroMQ qw/:all/;
my $coord_addr = 'tcp://127.0.0.1:5566';
my $publisher_addr = 'tcp://127.0.0.1:5567';
say "Welcome to my server ....";
my $cxt = ZeroMQ::Context->new;
my $coordinator = $cxt->socket(ZMQ_REP);
$coordinator->bind($coord_addr);
say "coordinator bound to $coord_addr";
my $publisher = $cxt->socket(ZMQ_PUB);
$publisher->bind($publisher_addr);
say "publisher bound to $publisher_addr";
while (1) {
my $msg = $coordinator->recv;
if ($msg) {
my $value = $msg->data;
say "Got command=($value) on coordinator ...";
if ( lc $value eq 'select' ) {
say "processing select ...";
my $uuid = Data::UUID->new->create_str;
say "sending subscription filter=($uuid)";
$coordinator->send( $uuid );
say "awaiting confirmation from subscriber ...";
if ( $coordinator->recv ) {
say "got confirmation from subscriber, starting to publish ...";
foreach my $i ( 0 .. 10 ) {
my $msg = "$uuid [ $i ]";
say "sending $msg";
$publisher->send($msg);
}
say "publishing completed, sending empty value to subscriber ...";
$publisher->send("$uuid ");
}
say "select command completed, notify the coordinator";
$coordinator->send('');
}
else {
say "unrecognized command ...";
$coordinator->send( "unknown command" );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment