Created
August 19, 2012 04:23
-
-
Save h4ck3rm1k3/3391956 to your computer and use it in GitHub Desktop.
ZeroMQ example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from http://stackoverflow.com/questions/12017932/function-exported-but-the-script-search-it-in-the-wrong-package/12021440#comment16043474_12021440 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Functions::ServerSocket; | |
################################################ | |
# This package will contain the functions to send and receive message | |
# on the server side | |
################################################ | |
use strict; | |
use warnings; | |
use ZeroMQ qw/:all/; | |
#use Functions::Testd qw(get_interface_address); | |
# Next lines are needed to export subroutines to the main package | |
use base 'Exporter'; | |
use vars qw/ @EXPORT_OK /; | |
@EXPORT_OK = qw(start_socket receive_msg send_msg end_msg close_socket term_ctxt send_ip); | |
# Modify this variables to change the path to the socket | |
my $socket_path = '*:6650'; | |
my $socket_protocol = 'tcp://'; | |
# Variables shared among all functions | |
my $ctxt; | |
my $socket; | |
my $sender = undef; | |
# Starting the daemon socket | |
sub start_socket { | |
$ctxt = ZeroMQ::Raw::zmq_init(5) || die "Couldn't initialise ZeroMQ context.\n"; | |
$socket = ZeroMQ::Raw::zmq_socket($ctxt, ZMQ_ROUTER) || die "Couldn't create socket.\n"; | |
ZeroMQ::Raw::zmq_setsockopt($socket, ZMQ_LINGER, 0); | |
my $rc = ZeroMQ::Raw::zmq_bind( $socket, "${socket_protocol}${socket_path}" ); | |
if ($socket_protocol =~ m/^ipc:\/\/$/) { | |
system("chmod 777 $socket_path"); | |
} | |
if ($rc != 0) { | |
return 0; | |
} | |
else { | |
return 1; | |
} | |
} | |
# Receiving a message | |
sub receive_msg { | |
my $msg = ZeroMQ::Raw::zmq_recv($socket); | |
my $line = ZeroMQ::Raw::zmq_msg_data($msg); | |
unless ($line) { | |
print "Couldn't retrieve pointer to data: $!\n"; | |
} | |
my @check = split /[[:blank:]]/, $line; | |
if (scalar(@check) == 1 and $check[0] eq uc($check[0])){ | |
$sender = $line; | |
return "Receiving connection from $sender.\n"; | |
} | |
else { | |
return $line; | |
} | |
} | |
# Select the recipient socket | |
sub select_recipient { | |
ZeroMQ::Raw::zmq_send($socket, $sender, ZMQ_SNDMORE); | |
print "Sending message to $sender:\n"; | |
# Undefining $sender to avoid multiple use of this function for the same message | |
$sender = undef; | |
} | |
# Send a message | |
sub send_msg { | |
# Retrieve message | |
my $msg = shift; | |
# If $sender is not undef, sending the first part of output with socket identity | |
if (defined($sender)){ | |
select_recipient(); | |
} | |
ZeroMQ::Raw::zmq_send($socket, $msg, ZMQ_SNDMORE); | |
chomp($msg); | |
print "MSG: \"$msg\" sent.\n"; | |
} | |
# End message. Use this functions to tell the shell that no more output will arrive. | |
sub end_msg { | |
# If $sender is not undef, sending the first part of output with socket identity | |
if (defined($sender)){ | |
select_recipient(); | |
} | |
ZeroMQ::Raw::zmq_send($socket, "END\n"); | |
print "MSG: \"END\" sent.\n"; | |
} | |
# Close the socket | |
sub close_socket { | |
my $socket_closed = ZeroMQ::Raw::zmq_close($socket); | |
if ($socket_closed == -1) { | |
die "Unable to close the socket: $!.\n"; | |
} | |
} | |
# Term ZeroMQ context | |
sub term_ctxt { | |
my $ctxt_closed = ZeroMQ::Raw::zmq_term($ctxt); | |
if ($ctxt_closed == -1) { | |
die "Unable to term ZeroMQ context: $!.\n"; | |
} | |
} | |
sub send_ip { | |
my $socket_path = shift; | |
my $socket_protocol = shift; | |
my $interface = shift; | |
# Modify this variables to change the default path to the socket | |
unless (defined($socket_path)) { | |
$socket_path = '127.0.0.1:6651'; | |
} | |
unless (defined($socket_protocol)) { | |
$socket_protocol = 'tcp://'; | |
} | |
my $ctxt = ZeroMQ::Raw::zmq_init(5) || die "Couldn't initialise ZeroMQ context.\n"; | |
my $socket = ZeroMQ::Raw::zmq_socket($ctxt, ZMQ_PUSH) || die "Couldn't create socket.\n"; | |
my $setopt = ZeroMQ::Raw::zmq_setsockopt($socket, ZMQ_IDENTITY, 'DAEMON'); | |
my $rc = ZeroMQ::Raw::zmq_connect( $socket, "${socket_protocol}${socket_path}" ); | |
my $daemon_ip = Functions::Testd::get_interface_address($interface); | |
warn "DEBUG:daemon_ip :$daemon_ip"; | |
ZeroMQ::Raw::zmq_send($socket, "$daemon_ip:6650"); | |
ZeroMQ::Raw::zmq_close($socket); | |
ZeroMQ::Raw::zmq_term($ctxt); | |
} | |
1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test the code | |
use Functions::Testd; | |
Functions::ServerSocket::send_ip("/tmp/path","ipc","interface"); | |
Functions::Testd::stop_daemon(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Functions::Testd; | |
######################################### | |
# Here will be stored all the functions needed for the daemon's environement | |
# mantainance | |
######################################### | |
use strict; | |
use warnings; | |
use Functions::ServerSocket qw(send_msg close_socket term_ctxt end_msg); | |
use Scalar::Util qw(looks_like_number); | |
use Socket; | |
require 'sys/ioctl.ph'; | |
# Next lines are needed to export subroutine to the main package | |
use base 'Exporter'; | |
use vars qw/ @EXPORT_OK /; | |
@EXPORT_OK = qw(stop_daemon get_interface_address); | |
# This function will close the socket, the context and unlink the file. | |
sub remove_socket { | |
close_socket(); | |
term_ctxt(); | |
unlink('/tmp/server.ipc'); | |
} | |
# This function will kill all remaining process that were started by the daemon | |
sub killing_child { | |
# Retrieving the list of processes | |
my @process = `ps -u cvmfs-test -o pid,args | grep -v defunct | grep -v cvmfs-test | grep -v PID | grep -v grep`; | |
# Array to store all pids | |
my @pids; | |
# Retrieving pids from the process list | |
foreach (@process) { | |
my @pid = (split /[[:blank:]]/, $_); | |
# I found that in some system the same command has a space before the pid | |
# I'm looking which one between the first two fields looks like a number. | |
if (looks_like_number($pid[0])){ | |
push @pids,$pid[0]; | |
} | |
else { | |
push @pids,$pid[1]; | |
} | |
} | |
my ($cnt, $success); | |
foreach(@pids){ | |
my $process = `ps -u cvmfs-test -p $_ | grep $_`; | |
if ($process) { | |
$cnt = kill 0, $_; | |
if ($cnt > 0) { | |
print "Sending TERM signal to process $_ ... "; | |
$success = kill 15, $_; | |
} | |
if ( defined($success) and $success > 0) { | |
print "Process terminated.\n"; | |
} | |
else { | |
print "Impossible to terminate the process $_\n"; | |
} | |
} | |
else { | |
print "No process with PID $_\n"; | |
} | |
} | |
} | |
# This functions is called when the daemon gets the 'stop' command. It launches | |
# some cleaning functions and exit. | |
sub stop_daemon { | |
# Killing all remaining process | |
killing_child(); | |
# Printing to the FIFO the last log. | |
send_msg("Daemon stopped.\n"); | |
send_msg("DAEMON_STOPPED\n"); | |
end_msg(); | |
# Removing the socket. Do it only when you're sure you don't have any more output to send. | |
remove_socket(); | |
print "Daemon stopped.\n"; | |
STDOUT->flush(); | |
exit 0; | |
} | |
# This function will accept a network interface and will retrieve the network ip for that interface | |
sub get_interface_address { | |
my $iface = shift; | |
my $socket; | |
socket($socket, PF_INET, SOCK_STREAM, (getprotobyname('tcp'))[2]) || die "unable to create a socket: $!\n"; | |
my $buf = pack('a256', $iface); | |
if (ioctl($socket, SIOCGIFADDR(), $buf) && (my @address = unpack('x20 C4', $buf))) { | |
return join('.', @address); | |
} | |
return undef; | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks , i have problem wih zeromq i installed it with storm open source when i submitted topology which using port 6703 got this
org.zeromq.ZMQException: Address already in use(0x62)
at org.zeromq.ZMQ$Socket.bind(Native Method)
do you know how can i fix it ?