-
-
Save anonymous/ab74a19c072d7cf08497 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require '0mq' | |
class ActorCall | |
def initialize(obj, method, args, context=ZMQ::DefaultContext) | |
uri = "inproc://actor_#{obj.object_id}" | |
@client = ZMQ::Socket.new ZMQ::REQ, context: context | |
@client.connect uri | |
@client.send_array [method, *args] | |
end | |
def get(flags=0) | |
return res if @res | |
begin | |
@res = @client.recv_array flags | |
@client.close | |
res | |
rescue Errno::EAGAIN | |
nil | |
end | |
end | |
def poll | |
get ZMQ::NOBLOCK | |
end | |
private | |
def res | |
case @res.first | |
when 'OK' | |
Marshal.load @res.last | |
when 'EXC' | |
raise Marshal.load @res.last | |
when 'STOPPED' | |
nil | |
end | |
end | |
end | |
class Actor | |
def initialize(obj, context=ZMQ::DefaultContext) | |
@obj = obj | |
@context = context | |
@active = false | |
end | |
def start | |
raise "already active" if active? | |
Thread.new {run} | |
self | |
end | |
def stop | |
raise "not active" unless active? | |
ActorCall.new(@obj, 'stop', [], @context).get | |
self | |
end | |
def active? | |
@active | |
end | |
def respond_to_missing?(method) | |
@obj.respond_to? method | |
end | |
def method_missing(method, *args) | |
return super unless @obj.respond_to? method | |
raise "not active" unless active? | |
ActorCall.new @obj, method.to_s, args, @context | |
end | |
private | |
def run | |
@server = ZMQ::Socket.new ZMQ::REP | |
uri = "inproc://actor_#{@obj.object_id}" | |
@server.bind uri | |
@active = true | |
while true | |
method, *args = @server.recv_array | |
unless method == 'stop' | |
handle_req(method, args) | |
else | |
@server.send_array ['STOPPED'] | |
while true | |
begin | |
method, *args = @server.recv_array ZMQ::NOBLOCK | |
handle_req(method, args) | |
rescue Errno::EAGAIN | |
@server.close | |
return | |
end | |
end | |
end | |
end | |
end | |
def handle_req(method, args) | |
begin | |
res = @obj.send method, *args | |
send = ['OK', Marshal.dump(res)] | |
rescue StandardError => e | |
send = ['EXC', Marshal.dump(e)] | |
end | |
@server.send_array send | |
end | |
end | |
# vim:tabstop=2 shiftwidth=2 noexpandtab: | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment