Skip to content

Instantly share code, notes, and snippets.

@daixque
Created February 25, 2012 14:55
Show Gist options
  • Save daixque/1908911 to your computer and use it in GitHub Desktop.
Save daixque/1908911 to your computer and use it in GitHub Desktop.
RPC over AMQP (RabbitMQ)
require 'amqp'
class Client
def initialize
@config = {
:host => 'localhost'
}
end
def request(api, msg, reply_to)
exchange = @channel.direct 'ex.rpc'
exchange.publish(msg, :routing_key => api.to_s, :reply_to => reply_to)
end
def listen_response(queue)
exchange = @channel.direct("amq.direct")
queue.bind(exchange, :routing_key => queue.name).subscribe do |header, payload|
@connection.close {
@response = payload
EventMachine.stop
}
end
end
def call(api, msg)
AMQP.start(@config) do |connection|
@connection = connection
@channel = AMQP::Channel.new connection
@channel.queue('', :auto_delete => true) do |queue|
request api, msg, queue.name
listen_response queue
end
end
@response
end
end
ping_res = Client.new.call(:ping, "ping")
p [:ping, ping_res]
echo_res = Client.new.call(:echo, "hello")
p [:echo, echo_res]
require 'amqp'
class Server
def initialize
@config = {
:host => 'localhost'
}
@acceptable_methods = [:ping, :echo]
end
def ping(payload)
"pong"
end
def echo(payload)
payload
end
def declare_api(name)
queue = @channel.queue("rpc.#{name}", :auto_delete => true)
queue.bind(@exchange, :routing_key => name.to_s).subscribe do |header, payload|
p [:received, payload]
exchange = @channel.direct "amq.direct"
reply_to = header.reply_to
msg = self.__send__(name, payload)
exchange.publish(msg, :routing_key => reply_to)
end
end
def run
AMQP.start(@config) do |connection|
@channel = AMQP::Channel.new connection
@exchange = @channel.direct 'ex.rpc'
@acceptable_methods.each { |m| declare_api m }
stopper = Proc.new { connection.close { EventMachine.stop } }
Signal.trap "INT", stopper
end
end
end
Server.new.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment