Created
May 26, 2013 15:44
-
-
Save josemic/5653157 to your computer and use it in GitHub Desktop.
Implementation of the halfsync_halfasync pattern in ruby. This gist is part of a ruby bug-report.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "singleton" | |
require "socket" | |
ACCEPT_EVENT = 0x01 | |
RECEIVE_EVENT = 0x02 | |
WRITE_EVENT = 0x04 | |
# Y_EVENT = 0x08 | |
# Z_EVENT = 0x10 | |
# Etc. | |
# | |
# halfsync-half-async-pattern | |
# based on POSA2 book reactor & active object pattern | |
# http://www.dre.vanderbilt.edu/~schmidt/PDF/HS-HA.pdf | |
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Active-Objects.pdf | |
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Reactor.pdf | |
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Acc-Con.pdf | |
class Message | |
attr_reader :io_handle, :data | |
def initialize(io_handle, data) | |
@io_handle = io_handle | |
@data = data | |
end | |
end | |
class EchoServerTask | |
def initialize(high_water_mark) | |
@synchronized_queue = SynchronizedQueue.new(high_water_mark) | |
end | |
def get_synchronized_queue() | |
return @synchronized_queue | |
end | |
def activate(n) | |
for i in 1..n | |
Thread.start(i,@synchronized_queue){ |i, synchronized_queue| | |
Thread.current["name"] = "EchoServerTask #{i}";self.svc_run(synchronized_queue,i)} | |
end | |
end | |
def svc_run(synchronized_queue, i) | |
puts("Thread #{i} started") | |
n = 0 | |
loop do | |
msg_packet = synchronized_queue.get() | |
puts("Task received #{msg_packet.inspect} to #{msg_packet.io_handle} as #{msg_packet.class}") | |
msg_packet.io_handle.sendmsg(Thread.current.object_id.to_s+":"+"#{n}"+":") | |
msg_packet.io_handle.sendmsg(msg_packet.data) | |
#msg_packet.io_handle.send(Thread.current.object_id.to_s+":"+"#{n}"+":",0) | |
#msg_packet.io_handle.send(msg_packet.data,0) | |
n = n+1 | |
end | |
end | |
end | |
# Synchronized Queue | |
class SynchronizedQueue | |
def initialize(high_water_mark) | |
@high_water_mark = high_water_mark | |
@list = [] | |
@mutex = Mutex.new | |
@not_empty = ConditionVariable.new() | |
@not_full = ConditionVariable.new() | |
end | |
def insert(method_request,timeout=nil) #blocking | |
@mutex.synchronize { | |
while(full_i()==true) | |
@not_full.wait(@mutex, timeout) | |
#@not_full.wait(@mutex) | |
end | |
insert_i(method_request) | |
} | |
end | |
def get(timeout=nil) | |
@mutex.synchronize { | |
while(empty_i()==true) | |
@not_empty.wait(@mutex, timeout) | |
end | |
return get_i() | |
} | |
end | |
private | |
def insert_i(method_request) #blocking | |
@list << method_request | |
@not_empty.signal if @list.length() > 0 | |
end | |
def get_i() | |
element = @list.shift() | |
@not_full.signal if @list.length() < @high_water_mark | |
return element | |
end | |
def empty_i() | |
return @list.size()==0 | |
end | |
def full_i() | |
return !(@list.size()< @high_water_mark) | |
end | |
end | |
# Reactor pattern start | |
class EventHandler | |
def initialize() | |
@io_handle = nil | |
end | |
def handle_accept_event() | |
nil # hook method | |
end | |
def handle_receive_event() | |
nil # hook method | |
end | |
def handle_write_event() | |
nil # hook method | |
end | |
def handle_close_event() | |
nil # virtual, hook method | |
end | |
def get_handle() | |
return @io_handle | |
end | |
end | |
class EchoAcceptor < EventHandler | |
def initialize(port, echo_server_task) | |
super() | |
@port = port | |
@echo_server_task = echo_server_task | |
@io_handle = TCPServer.new(port) | |
end | |
def handle_accept_event() # called by callback | |
connection_handle = @io_handle.accept() | |
echo_event_handler_obj = EchoServerHandler.new(connection_handle, @echo_server_task) | |
EchoReactor.instance.register_handler(echo_event_handler_obj, RECEIVE_EVENT) | |
end | |
def handle_close_event(connection_handle) | |
connection_handle.close() | |
EchoReactor.instance.remove_handler(connection_handle, ACCEPT_EVENT) | |
end | |
end | |
class EchoServerHandler < EventHandler | |
def initialize(io_handle, echo_server_task) | |
@io_handle = io_handle | |
@echo_server_task = echo_server_task | |
@data = "" | |
end | |
def handle_input() | |
begin | |
mesg, sender_sockaddr, rflags, *controls = @io_handle.recv(100000) | |
STDOUT.puts("Received message", mesg.inspect) | |
rescue Errno::ECONNRESET,Errno::ECONNABORTED,Errno::ETIMEDOUT=> e | |
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT) | |
puts("Connetion abborted") | |
return | |
end | |
if mesg == "" #connection close from peer side | |
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT) | |
puts("Connetion closed by peer side") | |
return | |
elsif mesg == nil | |
# lets asume the connection had been disconnected from peer side | |
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT) | |
puts("Connetion disconnected by by peer side") | |
return | |
else | |
@data = @data + mesg | |
cr = @data.rindex("\r") | |
nl = @data.rindex("\n") | |
crnl = @data.rindex("\r\n") | |
if cr == nil | |
cr = 0 | |
else | |
cr = cr+1 | |
end | |
if nl == nil | |
nl = 0 | |
else | |
nl = nl+1 | |
end | |
if crnl == nil | |
crnl = 0 | |
else | |
crnl = crnl+2 | |
end | |
n = [cr,nl,crnl].max() | |
message = @data[0,n].to_s # determine the message until \r, \n or \r\n | |
@data = @data[n,@data.length()].to_s | |
if message != "" | |
# pack the message | |
msg_packet = Message.new(@io_handle,message) | |
# put the message into the shared queue | |
self.put(msg_packet) | |
end | |
end | |
end | |
def put(message) | |
# uses the proxy to put the message into the queue() | |
@echo_server_task.get_synchronized_queue.insert(message) | |
end | |
def handle_close_event(handle) | |
handle.close() # close connection | |
end | |
end | |
# Echo reactor supports write handles, however this is not used here | |
class EchoReactor | |
include Singleton | |
def initialize() | |
@mutex = Mutex.new() | |
@accept_callback_hash = Hash.new() | |
@receive_callback_hash = Hash.new() | |
@write_callback_hash = Hash.new() | |
@read_pipe, @write_pipe = IO.pipe #created a pipe for handling updates to the callback hashes | |
puts("Generated pipe: read pipe #{@read_pipe}, write pipe #{@write_pipe}") | |
end | |
def handle_events() | |
loop do | |
accept_io_object_list = nil | |
receive_io_object_list = nil | |
write_io_object_list = nil | |
@mutex.synchronize { | |
# Determine all regsitered io_opjects for select | |
accept_io_object_list = @accept_callback_hash.keys | |
receive_io_object_list = @receive_callback_hash.keys | |
write_io_object_list = @write_callback_hash.keys | |
} | |
read_io_object_list, write_io_object_list = SynchronousEventDemultiplexer.instance.select(accept_io_object_list + receive_io_object_list << @read_pipe, write_io_object_list) | |
read_io_object_list.each do |read_io_object| | |
puts("read_io_object.inspect: #{read_io_object.inspect}") | |
puts("#{read_io_object}, #{@accept_callback_hash[read_io_object]},#{@receive_callback_hash[read_io_object]}") | |
@accept_callback_hash[read_io_object].handle_accept_event() if @accept_callback_hash[read_io_object] != nil | |
@receive_callback_hash[read_io_object].handle_input() if @receive_callback_hash[read_io_object] != nil | |
puts("@read_pipe: #{@read_pipe.inspect}") | |
puts("read_io_object: #{read_io_object.inspect}") | |
if @read_pipe == read_io_object # just get rid of the IO object received and loop as events have been registered or removed | |
received = read_io_object.read(1) | |
puts("reading io: #{read_io_object}, #{received}") | |
end | |
end | |
write_io_object_list.each do |write_io_object| | |
puts("write_io_object.inspect: #{write_io_object.inspect}") | |
puts("#{write_io_object}, #{@write_callback_hash[write_io_object]}") | |
@write_callback_hash[write_io_object].handle_write_event() if @write_callback_hash[write_io_object] != nil | |
puts("write_io_object: #{write_io_object.inspect}") | |
end | |
end | |
end | |
def register_handler(handler_obj, event_type) | |
io_object = handler_obj.get_handle() | |
@mutex.synchronize { | |
@accept_callback_hash[io_object]=handler_obj if (event_type & ACCEPT_EVENT != 0) | |
@receive_callback_hash[io_object]=handler_obj if (event_type & RECEIVE_EVENT != 0) | |
@write_callback_hash[io_object]=handler_obj if (event_type & WRITE_EVENT != 0) | |
@write_pipe.write("a") | |
puts("accept callbacks #{@accept_callback_hash.keys}") | |
puts("receive callbacks #{@receive_callback_hash.keys}") | |
puts("write callbacks #{@write_callback_hash.keys}") | |
} | |
end | |
def remove_handler(handle, event_type) | |
io_object = handle | |
@mutex.synchronize { | |
puts("deleting Handle: #{handle}") | |
@accept_callback_hash.delete(io_object) if (event_type & ACCEPT_EVENT != 0) | |
@receive_callback_hash.delete(io_object) if (event_type & RECEIVE_EVENT != 0) | |
@write_callback_hash.delete(io_object) if (event_type & WRITE_EVENT != 0) | |
@write_pipe.write("d") | |
puts("accept callbacks #{@accept_callback_hash.keys}") | |
puts("receive callbacks #{@receive_callback_hash.keys}") | |
puts("write callbacks #{@write_callback_hash.keys}") | |
} | |
end | |
end | |
# Wrapper fascade pattern: | |
class SynchronousEventDemultiplexer | |
include Singleton | |
def select(read_input_io_object_list, write_input_io_object_list) | |
read_result_array, write_result_array, error_result_array= IO.select(read_input_io_object_list, write_input_io_object_list) | |
puts("read_result_array.inspect: #{read_result_array.inspect}") | |
return read_result_array, write_result_array | |
end | |
end | |
# Reactor pattern end | |
debug =true | |
if debug | |
Thread.current["name"] = "main" | |
Thread.new{ | |
# for debugging puposes | |
loop do | |
Thread.current["name"] = "ThreadList" | |
Thread.list.each do |thread| | |
puts("Threads: #{thread.inspect} : #{thread[:name]} : #{thread.status()}") | |
end | |
end | |
} | |
end | |
# create newline echo on Port 10001 | |
puts("Starting") | |
echo_server_task = EchoServerTask.new(20) #create synchronized queue with 20 elements | |
echo_server_task.activate(5)# create thread pool of 5 Tasks | |
echo_acceptor_10001_obj = EchoAcceptor.new(10001, echo_server_task) | |
EchoReactor.instance.register_handler(echo_acceptor_10001_obj, ACCEPT_EVENT) | |
# start events | |
EchoReactor.instance.handle_events() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The sendmesg method works with:
Ruby 1.9.3-p392 MRI,
but crashes Ruby threads e.g. on:
jruby 1.7.3 (1.9.3p385) 2013-02-21 dac429b on OpenJDK 64-Bit Server VM 1.6.0_27-b27 [linux-amd64]
and on
rubinius 2.0.0.rc1 (1.8.7 96db2d8e yyyy-mm-dd JI) [x86_64-unknown-linux-gnu].
It partially runs on Ruby 2.0.0-p0 MRI (tested on linux).
When commenting out the sendmesg method (blocking) in lines 50 & 51 and uncommenting the lines 52 & 53 to use send (non-blocking) such as following, the sending thread no longer crashes.