Create a gist now

Instantly share code, notes, and snippets.

RPC with AMQP
require "amqp"
require "fiber"
module AMQP_RPC
module Client
def subscribe
response_queue.subscribe do |header, body|
id = header.correlation_id
if requests.has_key? id
requests[id].resume Marshal.load(body)
end
end
make_method :_provided_methods
provided_methods.each { |meth| make_method meth }
end
def requests
@requests ||= Hash.new
end
def connection
@connection ||= AMQP.connect host: 'localhost'
end
def channel
@channel ||= AMQP::Channel.new connection
end
def response_queue
if ! @response_queue
f = Fiber.current
channel.queue( "", exclusive: true, auto_delete: true ) do |queue, declare_ok|
f.resume queue
end
@response_queue = Fiber.yield
end
@response_queue
end
private
def make_method method
self.class.instance_eval do
define_method method do |*args|
id = rand(10000000).to_s
requests[id] = Fiber.current
channel.default_exchange.publish( Marshal.dump({method: method, args: args}), routing_key: self.class.name, reply_to: response_queue.name, correlation_id: id )
result = Fiber.yield
requests.delete id
result
end
end
end
def provided_methods
@provided_methods ||= _provided_methods
end
end
module Server
module ClassMethods
def provide *args
@provided_methods ||= []
@provided_methods += args
@provided_methods.uniq!
end
def provided_methods
@provided_methods
end
end
def self.included base
base.extend ClassMethods
end
def connection
@connection ||= AMQP.connect host: 'localhost'
end
def channel
@channel ||= AMQP::Channel.new connection
end
def queue
@queue ||= channel.queue self.class.name
end
def subscribe
queue.subscribe do |header, body|
body = Marshal.load body
method = body[:method]
args = body[:args]
result = self.send( method, *args )
channel.default_exchange.publish Marshal.dump(result), routing_key: header.reply_to, correlation_id: header.correlation_id
end
end
private
def _provided_methods
self.class.provided_methods
end
end
end
#!/usr/bin/env ruby
require "./amqp_rpc"
class Calculator
include AMQP_RPC::Client
def initialize
subscribe
end
end
EM.run do
Fiber.new do
c = Calculator.new
1.upto(50) { |i|
puts c.remote_method(i)
}
EM.stop { exit }
end.resume
end
#!/usr/bin/env ruby
require "./amqp_rpc"
class Calculator
include AMQP_RPC::Server
provide :remote_method
def initialize
subscribe
end
def remote_method x
x * 2
end
end
EM.run do
server = Calculator.new
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment