Skip to content

Instantly share code, notes, and snippets.

@pbosetti
Created January 5, 2011 08:28
Show Gist options
  • Save pbosetti/766056 to your computer and use it in GitHub Desktop.
Save pbosetti/766056 to your computer and use it in GitHub Desktop.
Simple InterProcessCommunication library
#!/usr/bin/env ruby
# test
#
# Created by Paolo Bosetti on 2011-01-04.
# Copyright (c) 2011 University of Trento. All rights reserved.
#
require "yaml"
require "socket"
require "timeout"
require "fileutils"
# Socket Abstraction class: single interface for both unix and UDP sockets.
# @author Paolo Bosetti
# @todo Add file-based interface
class SimpleSocket
LOCALHOST = '127.0.0.1'
# Initializer
# @param [Hash] args a hash of configurayion parameters
# @option args [Fixnum] :port udp port
# @option args [String] :host host address (name or udp, default to LOCALHOST constant)
# @option args [Symbol] :kind type of socket: :unix or :udp
# @option args [true,false] :force if true, reopens a Unix socket even if a stale socket file exists
def initialize(args = {})
@cfg = {:port => 5000, :host => LOCALHOST, :kind => :unix, :force => true }
@cfg.merge! args
case @cfg[:kind]
when :unix
@socket_file = "/tmp/#{$0}.sock"
@socket = nil
when :udp
@socket = UDPSocket.new
else
raise ArgumentError, "Either :unix or :udp allowed"
end
@open = false
end
# Opens the connection.
# @return [true,false] false if already open, true otherwise
def connect
return false if @open
case @cfg[:kind]
when :unix
@socket = UNIXSocket.open(@socket_file)
when :udp
@socket.connect(@cfg[:host], @cfg[:port])
end
@open = true
end
# Prints a string to the socket
# @return [nil]
def print(string)
@socket.print(string)
end
# Sets itself in sorver mode and start listening. If @cfg[:force] is true,
# removes any existing stale socket file, then retries to open the socket.
# @raise [Errno::EADDRINUSE] when the socet is busy
def listen
case @cfg[:kind]
when :unix
@socket = UNIXServer.open(@socket_file).accept
when :udp
@socket.bind(LOCALHOST, @cfg[:port])
end
rescue Errno::EADDRINUSE
if @cfg[:force] then
FileUtils::rm(@socket_file)
retry
else
puts $!
end
end
# Reads from socket, blocking mode.
# @param [Fixnum] bytes the number of bytes to be read.
# @return [String] the read packet
def recvfrom(bytes)
@socket.recvfrom(bytes)
end
# Reads from socket, non-blocking mode.
# @param [Fixnum] bytes the number of bytes to be read.
# @raise []
# @return [String] the read packet
def recv_nonblock(bytes)
@socket.recv_nonblock(bytes)
end
# Closes the socket, removing any possible socket file.
# @return [nil]
def close
@socket.close
@open = false
FileUtils::rm(@socket_file) if @socket_file
end
end
# Implements a simple inter-process communication. By default, it transfers
# objects serialized by YAML, although different serialization methods can
# be specified runtime thanks to blocks.
# @example Server:
# from_client = SimpleIPC.new :port => 5000, :timeout => 0, :kind => :udp
# from_client.listen
# from_client.get # YAML serialization
# from_client.get {|s| s.split(',').map {|e| e.to_f}} # Custom srlzn
# from_client.get {|s| s.unpack('N4')} # Binary pack srlzn
# from_client.close
# @example Client:
# to_server = SimpleIPC.new :port => 5000, :kind => :udp
# to_server.send([1,2,3,4, "test"]) # YAML serialization
# to_server.send([1,2,3,4]) {|o| o * ","} # Custom srlzn
# to_server.send([1,2,3,4]) {|o| o.pack("N4")} # Binary pack srlzn
# to_server.close
# @author Paolo Bosetti
class SimpleIPC
LENGTH_CODE = 'N'
LENGTH_SIZE = [0].pack(LENGTH_CODE).size
LOCALHOST = '127.0.0.1'
attr_accessor :cfg
attr_reader :stream_size
# Initializer
# @param [Hash] args a hash of configurayion parameters
# @option args [Fixnum] :port udp port
# @option args [String] :host host address (name or udp, default to LOCALHOST constant)
# @option args [Numeric] :timeout timeout to be used in blocking mode
# @option args [Symbol] :kind type of socket: :unix or :udp
def initialize(args = {})
@cfg = {:port => 5000, :host => LOCALHOST, :timeout => 1.0, :kind => :udp }
@cfg.merge! args
@stream_size
@socket = SimpleSocket.new @cfg
end
# Sends an object.
# @param [Object] something an object to be serialized and sent
# @return [String] the serialized payload
# @yield [Object] If a block is given, passes the received object to the block as o to be serialized
# @yieldparam [Object] o the object to be sent (anything serializable)
# @yieldreturn [String] the object serialized as a String
def send(something)
if block_given? then
payload = yield(something)
else
payload = YAML.dump(something)
end
length = [payload.size].pack(LENGTH_CODE)
@socket.connect
@socket.print(length)
@socket.print(payload)
return payload
end
# Puts itself in server mode and starts listening
def listen
@socket.listen
end
# When in server mode, call this method to read a message. If no block is
# given, it assumes that the object was passed as serialized by YAML.
# Otherwise a deserialization block must be provided.
# @return [Object,nil] the deserialized Object, or nil in case of timeout.
# @yield [String] the block must deserialize the String and return an Object
def get
result = nil
begin
if @cfg[:timeout] > 0 and !@cfg[:nonblock] then
Timeout::timeout(@cfg[:timeout]) do |to|
result = get_
end
else
result = get_
end
rescue Timeout::Error
result = nil
end
if block_given? then
return yield(result)
else
return YAML.load(result)
end
rescue Errno::EAGAIN
return nil
end
# Closes the socket.
def close
@socket.close
end
private
def get_
if @cfg[:nonblock] then
msg, sender = @socket.recv_nonblock(LENGTH_SIZE)
return nil if msg == ""
else
msg, sender = @socket.recvfrom(LENGTH_SIZE)
end
@stream_size = msg.unpack(LENGTH_CODE)[0]
msg, sender = @socket.recvfrom(@stream_size)
return msg
end
end
if __FILE__ == $0 then
class Array
def mean
self.inject {|sum,i| sum + i} / size
end
def sd
m = self.mean
v = self.inject(0.0) {|sum,i| sum + (i - m)**2}
Math::sqrt(v / (size - 1.0))
end
end
if ARGV[0] == "server" then
require "pp"
from_client = SimpleIPC.new :port => 5000, :timeout => 0, :kind => :udp
from_client.listen
from_client.cfg[:nonblock] = false
now = 0.0
data = {id:[], time: [], bytes: [], sent: [], cont:[]}
while true do
result = from_client.get
break if result == "stop"
data[:time] << Time.now
data[:sent] << result[1]
data[:id] << result[0]
data[:cont] << result[2]
data[:bytes] << from_client.stream_size
end
duration = Time.now - data[:sent][0]
transferred = data[:bytes].inject {|b,sum| sum + b}
latency = []
data[:time].each_with_index do |t,i|
latency << (data[:time][i] - data[:sent][i]) * 1000
puts "%4d: %10.5f -> %10.5f = %9.3fms (%d bytes)" % [
data[:id][i],
data[:sent][i] - data[:sent][0],
data[:time][i] - data[:sent][0],
latency[i],
data[:bytes][i]
]
end
puts
puts "transferred %d bytes in %12.6f s @ %d kbps" % [
transferred,
duration,
(transferred / duration * 8 / 1000).to_i
]
puts "average latency %9.3f ms, st.dev %9.3f ms" % [
latency.mean,
latency.sd
]
puts "minimum latency %9.3f ms, maximum latency %9.3f ms" % [
latency.min,
latency.max
]
else
to_server = SimpleIPC.new :port => 5000, :kind => :udp
n = ARGV[0].to_i
n = 10 if n <= 0
now = 0.0
n.times do |i|
to_server.send([i,Time.now,"test" * (rand(100)+1)])
end
to_server.send("stop")
to_server.close
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment