Created
February 21, 2017 23:09
-
-
Save dsisnero/32e2e0c12c70ed70da6919efe0ec1cbe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/setup' | |
require 'concurrent-edge' | |
require 'logger' | |
Concurrent.use_stdlib_logger Logger::DEBUG | |
class MyUDPServer < Concurrent::Actor::Context | |
def initialize | |
super | |
@running = false | |
@thread = nil | |
@server = nil | |
puts "started [#{name}]" | |
tell :start | |
end | |
def on_message(msg) | |
command, *args = msg | |
case command | |
when :start | |
start | |
when :stop | |
stop | |
when :read | |
read args[0] | |
when :running? | |
@running | |
else | |
pass | |
end | |
end | |
def on_event(event) | |
case event | |
when :terminated | |
stop | |
else | |
# nothing | |
end | |
end | |
def start | |
return true if @running | |
@server = UDPSocket.new | |
@server.setsockopt(:SOCKET, :REUSEADDR, true) | |
@server.bind('127.0.0.1', 5555) | |
@queue = Queue.new | |
@thread = Thread.new(@queue) do |queue| | |
Thread.current.abort_on_exception = true | |
while io = queue.pop | |
break if io == :end | |
begin | |
IO.select [io], [] | |
tell([:read, io]) | |
rescue IOError, Errno::EBADF => e | |
# closed socket | |
puts format '%s -> %s', io.inspect, e.inspect | |
end | |
end | |
end | |
@queue << @server | |
@running = true | |
end | |
def stop | |
return true unless @running | |
@server.close | |
@running = false | |
# stop and free the thread for GC | |
@queue << :end | |
@thread = nil | |
end | |
def read(io) | |
# TODO should read until empty | |
data = begin | |
io.recvfrom_nonblock(4096) | |
rescue IO::EAGAINWaitReadable | |
return | |
ensure | |
@queue << io | |
end | |
puts "server #{name} got: #{data.inspect}" | |
end | |
def default_executor | |
# this does not do IO no need for io executor | |
Concurrent.global_fast_executor | |
end | |
end | |
a = MyUDPServer.spawn!('udp1') | |
sleep 0.1 | |
ds = UDPSocket.new | |
ds.send("HA! data", 0, '127.0.0.1', 5555) | |
sleep 0.1 | |
ds.send("HA! data", 0, '127.0.0.1', 5555) | |
ds.send("HA! data", 0, '127.0.0.1', 5555) | |
ds.close | |
sleep 0.1 | |
a << :stop | |
sleep 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment