Skip to content

Instantly share code, notes, and snippets.

@prepor
Created December 4, 2009 08:54
Show Gist options
  • Save prepor/248916 to your computer and use it in GitHub Desktop.
Save prepor/248916 to your computer and use it in GitHub Desktop.
# MogileFS client backend on EventMachine
# req = mogilefs_client.backend.get_hosts
# req.callback { |data| puts data }
module EMogileFS
@@connections = {}
class << self
def <<(request)
connection(request.client).new_task request
end
def connection(client)
@@connections[client] ||= EM.attach client.send(:socket), EMogileFS, client
end
end
attr_accessor :client, :current_task
def initialize(client)
@queue = Queue.new
@current_task = nil
self.client = client
super
end
def new_task(request)
@queue << request
handle_task
end
def task_completed
@current_task = nil
end
def handle_task
if @queue.size > 0 && @current_task.nil?
@current_task = @queue.pop
@current_task.perform
end
end
def receive_data(data)
begin
raise MogileFS::Backend::ConnectionLost unless data
if current_task && current_task.callback
begin
parsed = client.send :parse_response, data
current_task.callback.call parsed
rescue MogileFS::Backend::ChannelNotFoundError => e
puts 'not found'
end
end
ensure
task_completed
handle_task
end
end
def unbind
detach
end
end
class EMogileFS::Request
attr_accessor :request, :client, :socket, :options
def initialize(client, request)
self.client, self.request = client, request
self.socket = client.send :socket
EMogileFS << self
end
def callback(options = {}, &block)
if block
@options = options
@callback = block
else
@callback
end
end
def perform
begin
bytes_sent = socket.send request, 0
rescue SystemCallError
client.send :shutdown
raise MogileFS::UnreachableBackendError
end
unless bytes_sent == request.length then
raise MogileFS::RequestTruncatedError,
"request truncated (sent #{bytes_sent} expected #{request.length})"
end
end
end
class MogileFS::Backend
def do_request(cmd, args)
request = make_request cmd, args
EMogileFS::Request.new(self, request)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment