Skip to content

Instantly share code, notes, and snippets.

@h4ck3rm1k3
Created August 19, 2012 04:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save h4ck3rm1k3/3391956 to your computer and use it in GitHub Desktop.
Save h4ck3rm1k3/3391956 to your computer and use it in GitHub Desktop.
ZeroMQ example
from http://stackoverflow.com/questions/12017932/function-exported-but-the-script-search-it-in-the-wrong-package/12021440#comment16043474_12021440
package Functions::ServerSocket;
################################################
# This package will contain the functions to send and receive message
# on the server side
################################################
use strict;
use warnings;
use ZeroMQ qw/:all/;
#use Functions::Testd qw(get_interface_address);
# Next lines are needed to export subroutines to the main package
use base 'Exporter';
use vars qw/ @EXPORT_OK /;
@EXPORT_OK = qw(start_socket receive_msg send_msg end_msg close_socket term_ctxt send_ip);
# Modify this variables to change the path to the socket
my $socket_path = '*:6650';
my $socket_protocol = 'tcp://';
# Variables shared among all functions
my $ctxt;
my $socket;
my $sender = undef;
# Starting the daemon socket
sub start_socket {
$ctxt = ZeroMQ::Raw::zmq_init(5) || die "Couldn't initialise ZeroMQ context.\n";
$socket = ZeroMQ::Raw::zmq_socket($ctxt, ZMQ_ROUTER) || die "Couldn't create socket.\n";
ZeroMQ::Raw::zmq_setsockopt($socket, ZMQ_LINGER, 0);
my $rc = ZeroMQ::Raw::zmq_bind( $socket, "${socket_protocol}${socket_path}" );
if ($socket_protocol =~ m/^ipc:\/\/$/) {
system("chmod 777 $socket_path");
}
if ($rc != 0) {
return 0;
}
else {
return 1;
}
}
# Receiving a message
sub receive_msg {
my $msg = ZeroMQ::Raw::zmq_recv($socket);
my $line = ZeroMQ::Raw::zmq_msg_data($msg);
unless ($line) {
print "Couldn't retrieve pointer to data: $!\n";
}
my @check = split /[[:blank:]]/, $line;
if (scalar(@check) == 1 and $check[0] eq uc($check[0])){
$sender = $line;
return "Receiving connection from $sender.\n";
}
else {
return $line;
}
}
# Select the recipient socket
sub select_recipient {
ZeroMQ::Raw::zmq_send($socket, $sender, ZMQ_SNDMORE);
print "Sending message to $sender:\n";
# Undefining $sender to avoid multiple use of this function for the same message
$sender = undef;
}
# Send a message
sub send_msg {
# Retrieve message
my $msg = shift;
# If $sender is not undef, sending the first part of output with socket identity
if (defined($sender)){
select_recipient();
}
ZeroMQ::Raw::zmq_send($socket, $msg, ZMQ_SNDMORE);
chomp($msg);
print "MSG: \"$msg\" sent.\n";
}
# End message. Use this functions to tell the shell that no more output will arrive.
sub end_msg {
# If $sender is not undef, sending the first part of output with socket identity
if (defined($sender)){
select_recipient();
}
ZeroMQ::Raw::zmq_send($socket, "END\n");
print "MSG: \"END\" sent.\n";
}
# Close the socket
sub close_socket {
my $socket_closed = ZeroMQ::Raw::zmq_close($socket);
if ($socket_closed == -1) {
die "Unable to close the socket: $!.\n";
}
}
# Term ZeroMQ context
sub term_ctxt {
my $ctxt_closed = ZeroMQ::Raw::zmq_term($ctxt);
if ($ctxt_closed == -1) {
die "Unable to term ZeroMQ context: $!.\n";
}
}
sub send_ip {
my $socket_path = shift;
my $socket_protocol = shift;
my $interface = shift;
# Modify this variables to change the default path to the socket
unless (defined($socket_path)) {
$socket_path = '127.0.0.1:6651';
}
unless (defined($socket_protocol)) {
$socket_protocol = 'tcp://';
}
my $ctxt = ZeroMQ::Raw::zmq_init(5) || die "Couldn't initialise ZeroMQ context.\n";
my $socket = ZeroMQ::Raw::zmq_socket($ctxt, ZMQ_PUSH) || die "Couldn't create socket.\n";
my $setopt = ZeroMQ::Raw::zmq_setsockopt($socket, ZMQ_IDENTITY, 'DAEMON');
my $rc = ZeroMQ::Raw::zmq_connect( $socket, "${socket_protocol}${socket_path}" );
my $daemon_ip = Functions::Testd::get_interface_address($interface);
warn "DEBUG:daemon_ip :$daemon_ip";
ZeroMQ::Raw::zmq_send($socket, "$daemon_ip:6650");
ZeroMQ::Raw::zmq_close($socket);
ZeroMQ::Raw::zmq_term($ctxt);
}
1;
# test the code
use Functions::Testd;
Functions::ServerSocket::send_ip("/tmp/path","ipc","interface");
Functions::Testd::stop_daemon();
package Functions::Testd;
#########################################
# Here will be stored all the functions needed for the daemon's environement
# mantainance
#########################################
use strict;
use warnings;
use Functions::ServerSocket qw(send_msg close_socket term_ctxt end_msg);
use Scalar::Util qw(looks_like_number);
use Socket;
require 'sys/ioctl.ph';
# Next lines are needed to export subroutine to the main package
use base 'Exporter';
use vars qw/ @EXPORT_OK /;
@EXPORT_OK = qw(stop_daemon get_interface_address);
# This function will close the socket, the context and unlink the file.
sub remove_socket {
close_socket();
term_ctxt();
unlink('/tmp/server.ipc');
}
# This function will kill all remaining process that were started by the daemon
sub killing_child {
# Retrieving the list of processes
my @process = `ps -u cvmfs-test -o pid,args | grep -v defunct | grep -v cvmfs-test | grep -v PID | grep -v grep`;
# Array to store all pids
my @pids;
# Retrieving pids from the process list
foreach (@process) {
my @pid = (split /[[:blank:]]/, $_);
# I found that in some system the same command has a space before the pid
# I'm looking which one between the first two fields looks like a number.
if (looks_like_number($pid[0])){
push @pids,$pid[0];
}
else {
push @pids,$pid[1];
}
}
my ($cnt, $success);
foreach(@pids){
my $process = `ps -u cvmfs-test -p $_ | grep $_`;
if ($process) {
$cnt = kill 0, $_;
if ($cnt > 0) {
print "Sending TERM signal to process $_ ... ";
$success = kill 15, $_;
}
if ( defined($success) and $success > 0) {
print "Process terminated.\n";
}
else {
print "Impossible to terminate the process $_\n";
}
}
else {
print "No process with PID $_\n";
}
}
}
# This functions is called when the daemon gets the 'stop' command. It launches
# some cleaning functions and exit.
sub stop_daemon {
# Killing all remaining process
killing_child();
# Printing to the FIFO the last log.
send_msg("Daemon stopped.\n");
send_msg("DAEMON_STOPPED\n");
end_msg();
# Removing the socket. Do it only when you're sure you don't have any more output to send.
remove_socket();
print "Daemon stopped.\n";
STDOUT->flush();
exit 0;
}
# This function will accept a network interface and will retrieve the network ip for that interface
sub get_interface_address {
my $iface = shift;
my $socket;
socket($socket, PF_INET, SOCK_STREAM, (getprotobyname('tcp'))[2]) || die "unable to create a socket: $!\n";
my $buf = pack('a256', $iface);
if (ioctl($socket, SIOCGIFADDR(), $buf) && (my @address = unpack('x20 C4', $buf))) {
return join('.', @address);
}
return undef;
}
1;
@beginneer
Copy link

thanks , i have problem wih zeromq i installed it with storm open source when i submitted topology which using port 6703 got this
org.zeromq.ZMQException: Address already in use(0x62)
at org.zeromq.ZMQ$Socket.bind(Native Method)
do you know how can i fix it ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment