Skip to content

Instantly share code, notes, and snippets.

@headius
Created December 29, 2014 20:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save headius/0d14997b4758bf0cb128 to your computer and use it in GitHub Desktop.
Save headius/0d14997b4758bf0cb128 to your computer and use it in GitHub Desktop.
require "singleton"
require "socket"
ACCEPT_EVENT = 0x01
RECEIVE_EVENT = 0x02
WRITE_EVENT = 0x04
# Y_EVENT = 0x08
# Z_EVENT = 0x10
# Etc.
#
# halfsync-half-async-pattern
# based on POSA2 book reactor & active object pattern
# http://www.dre.vanderbilt.edu/~schmidt/PDF/HS-HA.pdf
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Active-Objects.pdf
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Reactor.pdf
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Acc-Con.pdf
class Message
attr_reader :io_handle, :data
def initialize(io_handle, data)
@io_handle = io_handle
@data = data
end
end
class EchoServerTask
def initialize(high_water_mark)
@synchronized_queue = SynchronizedQueue.new(high_water_mark)
end
def get_synchronized_queue()
return @synchronized_queue
end
def activate(n)
for i in 1..n
Thread.start(i,@synchronized_queue){ |i, synchronized_queue|
Thread.current["name"] = "EchoServerTask #{i}";self.svc_run(synchronized_queue,i)}
end
end
def svc_run(synchronized_queue, i)
puts("Thread #{i} started")
n = 0
loop do
msg_packet = synchronized_queue.get()
puts("Task received #{msg_packet.inspect} to #{msg_packet.io_handle} as #{msg_packet.class}")
msg_packet.io_handle.sendmsg(Thread.current.object_id.to_s+":"+"#{n}"+":")
msg_packet.io_handle.sendmsg(msg_packet.data)
#msg_packet.io_handle.send(Thread.current.object_id.to_s+":"+"#{n}"+":",0)
#msg_packet.io_handle.send(msg_packet.data,0)
n = n+1
end
end
end
# Synchronized Queue
class SynchronizedQueue
def initialize(high_water_mark)
@high_water_mark = high_water_mark
@list = []
@mutex = Mutex.new
@not_empty = ConditionVariable.new()
@not_full = ConditionVariable.new()
end
def insert(method_request,timeout=nil) #blocking
@mutex.synchronize {
while(full_i()==true)
@not_full.wait(@mutex, timeout)
#@not_full.wait(@mutex)
end
insert_i(method_request)
}
end
def get(timeout=nil)
@mutex.synchronize {
while(empty_i()==true)
@not_empty.wait(@mutex, timeout)
end
return get_i()
}
end
private
def insert_i(method_request) #blocking
@list << method_request
@not_empty.signal if @list.length() > 0
end
def get_i()
element = @list.shift()
@not_full.signal if @list.length() < @high_water_mark
return element
end
def empty_i()
return @list.size()==0
end
def full_i()
return !(@list.size()< @high_water_mark)
end
end
# Reactor pattern start
class EventHandler
def initialize()
@io_handle = nil
end
def handle_accept_event()
nil # hook method
end
def handle_receive_event()
nil # hook method
end
def handle_write_event()
nil # hook method
end
def handle_close_event()
nil # virtual, hook method
end
def get_handle()
return @io_handle
end
end
class EchoAcceptor < EventHandler
def initialize(port, echo_server_task)
super()
@port = port
@echo_server_task = echo_server_task
@io_handle = TCPServer.new(port)
end
def handle_accept_event() # called by callback
connection_handle = @io_handle.accept()
echo_event_handler_obj = EchoServerHandler.new(connection_handle, @echo_server_task)
EchoReactor.instance.register_handler(echo_event_handler_obj, RECEIVE_EVENT)
end
def handle_close_event(connection_handle)
connection_handle.close()
EchoReactor.instance.remove_handler(connection_handle, ACCEPT_EVENT)
end
end
class EchoServerHandler < EventHandler
def initialize(io_handle, echo_server_task)
@io_handle = io_handle
@echo_server_task = echo_server_task
@data = ""
end
def handle_input()
begin
mesg, sender_sockaddr, rflags, *controls = @io_handle.recv(100000)
STDOUT.puts("Received message", mesg.inspect)
rescue Errno::ECONNRESET,Errno::ECONNABORTED,Errno::ETIMEDOUT=> e
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
puts("Connetion abborted")
return
end
if mesg == "" #connection close from peer side
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
puts("Connetion closed by peer side")
return
elsif mesg == nil
# lets asume the connection had been disconnected from peer side
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
puts("Connetion disconnected by by peer side")
return
else
@data = @data + mesg
cr = @data.rindex("\r")
nl = @data.rindex("\n")
crnl = @data.rindex("\r\n")
if cr == nil
cr = 0
else
cr = cr+1
end
if nl == nil
nl = 0
else
nl = nl+1
end
if crnl == nil
crnl = 0
else
crnl = crnl+2
end
n = [cr,nl,crnl].max()
message = @data[0,n].to_s # determine the message until \r, \n or \r\n
@data = @data[n,@data.length()].to_s
if message != ""
# pack the message
msg_packet = Message.new(@io_handle,message)
# put the message into the shared queue
self.put(msg_packet)
end
end
end
def put(message)
# uses the proxy to put the message into the queue()
@echo_server_task.get_synchronized_queue.insert(message)
end
def handle_close_event(handle)
handle.close() # close connection
end
end
# Echo reactor supports write handles, however this is not used here
class EchoReactor
include Singleton
def initialize()
@mutex = Mutex.new()
@accept_callback_hash = Hash.new()
@receive_callback_hash = Hash.new()
@write_callback_hash = Hash.new()
@read_pipe, @write_pipe = IO.pipe #created a pipe for handling updates to the callback hashes
puts("Generated pipe: read pipe #{@read_pipe}, write pipe #{@write_pipe}")
end
def handle_events()
loop do
accept_io_object_list = nil
receive_io_object_list = nil
write_io_object_list = nil
@mutex.synchronize {
# Determine all regsitered io_opjects for select
accept_io_object_list = @accept_callback_hash.keys
receive_io_object_list = @receive_callback_hash.keys
write_io_object_list = @write_callback_hash.keys
}
read_io_object_list, write_io_object_list = SynchronousEventDemultiplexer.instance.select(accept_io_object_list + receive_io_object_list << @read_pipe, write_io_object_list)
read_io_object_list.each do |read_io_object|
puts("read_io_object.inspect: #{read_io_object.inspect}")
puts("#{read_io_object}, #{@accept_callback_hash[read_io_object]},#{@receive_callback_hash[read_io_object]}")
@accept_callback_hash[read_io_object].handle_accept_event() if @accept_callback_hash[read_io_object] != nil
@receive_callback_hash[read_io_object].handle_input() if @receive_callback_hash[read_io_object] != nil
puts("@read_pipe: #{@read_pipe.inspect}")
puts("read_io_object: #{read_io_object.inspect}")
if @read_pipe == read_io_object # just get rid of the IO object received and loop as events have been registered or removed
received = read_io_object.read(1)
puts("reading io: #{read_io_object}, #{received}")
end
end
write_io_object_list.each do |write_io_object|
puts("write_io_object.inspect: #{write_io_object.inspect}")
puts("#{write_io_object}, #{@write_callback_hash[write_io_object]}")
@write_callback_hash[write_io_object].handle_write_event() if @write_callback_hash[write_io_object] != nil
puts("write_io_object: #{write_io_object.inspect}")
end
end
end
def register_handler(handler_obj, event_type)
io_object = handler_obj.get_handle()
@mutex.synchronize {
@accept_callback_hash[io_object]=handler_obj if (event_type & ACCEPT_EVENT != 0)
@receive_callback_hash[io_object]=handler_obj if (event_type & RECEIVE_EVENT != 0)
@write_callback_hash[io_object]=handler_obj if (event_type & WRITE_EVENT != 0)
@write_pipe.write("a")
puts("accept callbacks #{@accept_callback_hash.keys}")
puts("receive callbacks #{@receive_callback_hash.keys}")
puts("write callbacks #{@write_callback_hash.keys}")
}
end
def remove_handler(handle, event_type)
io_object = handle
@mutex.synchronize {
puts("deleting Handle: #{handle}")
@accept_callback_hash.delete(io_object) if (event_type & ACCEPT_EVENT != 0)
@receive_callback_hash.delete(io_object) if (event_type & RECEIVE_EVENT != 0)
@write_callback_hash.delete(io_object) if (event_type & WRITE_EVENT != 0)
@write_pipe.write("d")
puts("accept callbacks #{@accept_callback_hash.keys}")
puts("receive callbacks #{@receive_callback_hash.keys}")
puts("write callbacks #{@write_callback_hash.keys}")
}
end
end
# Wrapper fascade pattern:
class SynchronousEventDemultiplexer
include Singleton
def select(read_input_io_object_list, write_input_io_object_list)
read_result_array, write_result_array, error_result_array= IO.select(read_input_io_object_list, write_input_io_object_list)
puts("read_result_array.inspect: #{read_result_array.inspect}")
return read_result_array, write_result_array
end
end
# Reactor pattern end
debug =true
if debug
Thread.current["name"] = "main"
Thread.new{
# for debugging puposes
loop do
Thread.current["name"] = "ThreadList"
Thread.list.each do |thread|
puts("Threads: #{thread.inspect} : #{thread[:name]} : #{thread.status()}")
end
end
}
end
# create newline echo on Port 10001
puts("Starting")
echo_server_task = EchoServerTask.new(20) #create synchronized queue with 20 elements
echo_server_task.activate(5)# create thread pool of 5 Tasks
echo_acceptor_10001_obj = EchoAcceptor.new(10001, echo_server_task)
EchoReactor.instance.register_handler(echo_acceptor_10001_obj, ACCEPT_EVENT)
# start events
EchoReactor.instance.handle_events()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment