Skip to content

Instantly share code, notes, and snippets.

@wesyoung
Created December 18, 2013 12:45
Show Gist options
  • Save wesyoung/8021741 to your computer and use it in GitHub Desktop.
Save wesyoung/8021741 to your computer and use it in GitHub Desktop.
it's a start.
#!/usr/bin/env perl
use 5.011;
use strict;
use warnings;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(:all); # separate module
use threads;
use Carp::Assert;
use Data::Dumper;
use constant NBR_CLIENTS => 5;
use constant NBR_WORKERS => 2;
use constant READY => "\001";
use constant BACKEND_URL => 'ipc://backend.ipc';
use constant FRONTEND_URL => 'ipc://frontend.ipc';
main();
sub client_thread {
my $i = shift;
my $identity = 'Client-'.$i;
my $ctx = zmq_ctx_new();
my $socket = zmq_socket($ctx,ZMQ_REQ);
my $rv = zmq_setsockopt($socket,ZMQ_IDENTITY,$identity);
$rv = zmq_connect($socket,FRONTEND_URL());
assert($rv == 0);
my $reply = zmq_msg_init();
while(1){
say $identity.' sending hello...';
$rv = zmq_msg_send('HELLO',$socket);
assert($rv > 0);
say "$identity waiting for reply...";
$rv = zmq_msg_recv($reply,$socket);
assert($rv > -1);
$reply = zmq_msg_init();
}
}
sub worker_thread {
my $i = shift;
my $identity = 'Worker-'.$i;
say 'starting '.$identity;
my $ctx = zmq_ctx_new();
my $socket = zmq_socket($ctx,ZMQ_REQ);
my $rv = zmq_setsockopt($socket,ZMQ_IDENTITY,$identity);
$rv = zmq_connect($socket,BACKEND_URL());
assert($rv == 0);
$rv = zmq_msg_send(READY(),$socket);
assert($rv > 0);
my ($addr,$delim,$data,$ok) = (zmq_msg_init(),zmq_msg_init(),zmq_msg_init(),zmq_msg_init_data('OK'));
my $m = zmq_msg_init();
my @msg = [];
while(1){
say "$identity waiting...";
$rv = zmq_msg_recv($addr,$socket);
$rv = zmq_msg_recv($delim,$socket,ZMQ_RCVMORE);
$rv = zmq_msg_recv($data,$socket);
say "$identity got a: ".zmq_msg_data($data);
assert($rv > -1);
say "$identity: sending OK";
$rv = zmq_msg_send($addr,$socket,ZMQ_SNDMORE);
$rv = zmq_msg_send($delim,$socket,ZMQ_SNDMORE);
$rv = zmq_msg_send($ok,$socket);
assert($rv != -1);
$m = zmq_msg_init();
}
}
sub main {
say 'starting main...';
my $client_nbr = NBR_CLIENTS();
my $ctx = zmq_ctx_new();
my $frontend = zmq_socket($ctx,ZMQ_ROUTER);
my $backend = zmq_socket($ctx,ZMQ_ROUTER);
my $rv = zmq_bind($frontend, FRONTEND_URL());
assert($rv == 0);
$rv = zmq_bind($backend, BACKEND_URL());
assert($rv == 0);
my @threads;
foreach my $t (1 .. NBR_WORKERS()){
say 'starting worker: '.$t;
push(@threads,threads->create('worker_thread',$t));
}
foreach my $t (1 .. NBR_CLIENTS()){
say 'starting client: '.$t;
push(@threads,threads->create('client_thread',$t));
}
my @workers;
my ($w_addr,$delim,$c_addr,$data);
my $items = [
{
events => ZMQ_POLLIN,
socket => $frontend,
callback => sub {
if($#workers > -1){
say 'frontend...';
my ($m,$msg);
while(1){
$m = zmq_msg_init();
$rv = zmq_msg_recv($m,$frontend);
assert($rv != -1);
push(@$msg,$m);
last unless(zmq_getsockopt($frontend,ZMQ_RCVMORE));
}
my $wrk = pop(@workers);
$rv = zmq_msg_send($wrk,$backend,ZMQ_SNDMORE);
$rv = zmq_msg_send('',$backend,ZMQ_SNDMORE);
$rv = zmq_msg_send(@$msg[0],$backend,ZMQ_SNDMORE);
$rv = zmq_msg_send('',$backend,ZMQ_SNDMORE);
$rv = zmq_msg_send(@$msg[2],$backend);
assert($rv != -1);
}
},
},
{
events => ZMQ_POLLIN,
socket => $backend,
callback => sub {
say 'backend...';
my $msg = [];
my $m;
while(1){
$m = zmq_msg_init();
$rv = zmq_msg_recv($m,$backend);
assert($rv > -1);
push(@$msg,zmq_msg_data($m));
last unless(zmq_getsockopt($backend,ZMQ_RCVMORE));
}
assert($#workers < NBR_WORKERS());
$w_addr = @$msg[0];
push(@workers,$w_addr);
$delim = @$msg[1];
assert($delim eq '');
$c_addr = @$msg[2];
if($c_addr ne READY()){
$delim = @$msg[3];
assert ($delim eq '');
$data = @$msg[4];
say 'sending '.$data.' to '.$c_addr;
$rv = zmq_msg_send($c_addr,$frontend,ZMQ_SNDMORE);
$rv = zmq_msg_send('',$frontend,ZMQ_SNDMORE);
$rv = zmq_msg_send($data,$frontend);
assert($rv > -1);
} else {
say 'worker checking in: '.$w_addr;
}
},
},
];
while(1){ zmq_poll($items); }
$_->join() for(@threads);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment