Skip to content

Instantly share code, notes, and snippets.

@gnarg
Created October 26, 2012 09:03
Show Gist options
  • Save gnarg/3957752 to your computer and use it in GitHub Desktop.
Save gnarg/3957752 to your computer and use it in GitHub Desktop.
socketpair protocol for druby
require 'socket'
require 'drb/drb'
module DRbSocketPair
def self.open(uri, config={})
tag = parse_uri(uri)
parent_socket, child_socket = @parent.sockets[tag]
parent_socket.close unless parent_socket.closed?
Child.new(child_socket, config)
end
def self.open_server(uri, config={})
parse_uri(uri)
@parent = Parent.new(config)
end
def self.uri_option(uri, config={})
parse_uri(uri)
return uri, nil
end
def self.before_fork(tag)
@parent.sockets[tag.to_s] = Socket.pair(Socket::AF_UNIX,
Socket::SOCK_STREAM, 0)
@parent.wake
end
def self.fork(tag, &block)
before_fork(tag)
Process.fork(&block)
end
def self.parse_uri(uri)
if uri =~ /^drbsocketpair:(.*)/
$1
else
raise DRb::DRbBadScheme.new
end
end
class Parent
attr_reader :sockets
def initialize(config)
@config = config
@sockets = {}
@wake_out, @wake_in = IO.pipe
end
def accept
loop do
@sockets.delete_if {|_,(s,_)| s.closed? }
sockets_to_listen = @sockets.values.map{|s,_| s }
ready = IO.select(sockets_to_listen + [@wake_out])[0][0]
if ready == @wake_out
ready.read(1)
next
else
tag, (parent_socket, child_socket) = @sockets.find {|_,(s,_)| s == ready }
@sockets.delete(tag)
child_socket.close unless child_socket.closed?
return Server.new(parent_socket, tag, @config)
end
end
end
def close
@sockets.each {|s,_| s.close unless s.closed? }
wake
end
def wake
@wake_in.write('.')
end
def uri
'drubysocketpair:'
end
end
class Server
def initialize(socket, tag, config)
@socket = socket
@tag = tag
@msg = DRb::DRbMessage.new(config)
end
def recv_request
@msg.recv_request(@socket)
end
def send_reply(succ, result)
@msg.send_reply(@socket, succ, result)
end
def close
@socket.close
end
def uri
"drbsocketpair:#{@tag}"
end
end
class Child
def initialize(socket, config)
@socket = socket
@msg = DRb::DRbMessage.new(config)
end
def send_request(ref, msg_id, arg, b)
@msg.send_request(@socket, ref, msg_id, arg, b)
end
def recv_reply
@msg.recv_reply(@socket)
end
def alive?
!@socket.closed?
end
def close
@socket.close unless @socket.closed?
end
end
DRb::DRbProtocol.add_protocol(DRbSocketPair)
end
if $0 == __FILE__
class TimeServer
def get_current_time
Time.now
end
end
DRb.start_service('drbsocketpair:', TimeServer.new)
pids = []
DRbSocketPair.before_fork(0)
pids << Process.fork do
timeserver = DRbObject.new_with_uri('drbsocketpair:0')
puts "!! 0 !! #{Process.pid} #{timeserver.get_current_time}"
end
DRbSocketPair.before_fork(1)
pids << Process.fork do
timeserver = DRbObject.new_with_uri('drbsocketpair:1')
puts "!! 1 !! #{Process.pid} #{timeserver.get_current_time}"
end
pids << DRbSocketPair.fork(2) do
timeserver = DRbObject.new_with_uri('drbsocketpair:2')
puts "!! 2 !! #{Process.pid} #{timeserver.get_current_time}"
puts "!! 2 !! #{Process.pid} #{timeserver.get_current_time} again"
end
pids.each{|pid| Process.wait(pid) }
end
@seki
Copy link

seki commented Nov 3, 2012

dRuby needs open more connections.
I will explain tomorrow.

pids << DRbSocketPair.fork(3) do
timeserver = DRbObject.new_with_uri('drbsocketpair:3')
timeserver.queue.push(:a)
timeserver.queue.push(:b)
t1 = Thread.new do
p timeserver.queue.pop
end
t2 = Thread.new do
p timeserver.queue.pop
end
sleep 2
t1.join
t2.join
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment