Skip to content

Instantly share code, notes, and snippets.

@webcoyote
Last active December 21, 2015 03:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save webcoyote/6245342 to your computer and use it in GitHub Desktop.
Save webcoyote/6245342 to your computer and use it in GitHub Desktop.
Single connection to remote server using celluloid
require 'celluloid/io'
class Connection
include Celluloid::IO
PACK_LENGTH = 'S>' # unsigned big-endian
def initialize (host, port)
@transactions = {}
@host = host
@port = port
async.run
end
def send_rpc (actor, id, msg)
if @connection != nil
@transactions[id] = actor
send(msg)
else
actor.mailbox << nil
end
end
private
def run
loop do
begin
connect
handle_messages
rescue
disconnect
end
end
end
def connect
loop do
begin
@connection = Celluloid::IO::TCPSocket.new(@host, @port)
return
rescue
sleep 5
end
end
end
def disconnect
@connection.close
@connection = nil
# ... for each transaction that is outstanding, abort it
end
def handle_messages
loop do
msg = tcp_receive
id = # extract id from msg
actor = transactions[id]
actor.mailbox << msg
transactions.remove(id)
end
end
def send(msg)
@connection.write([msg.length].pack(PACK_LENGTH) + msg)
end
def tcp_receive
length = @connection.read(2).unpack(PACK_LENGTH)[0]
msg = @connection.read(length)
end
end
class Manager
def initialize(host, port)
# Create a shared connection for all transactions
@conn = Connection.new(host, port)
end
def send_transaction (msg)
id = # get next transaction id
msg = # insert id into msg
# Send message over shared socket
@conn.send_rpc(Actor.current, id, msg)
# Wait for response
reply = Actor.current.receive
result = # parse reply
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment