Skip to content

Instantly share code, notes, and snippets.

@jdoconnor
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdoconnor/9438386 to your computer and use it in GitHub Desktop.
Save jdoconnor/9438386 to your computer and use it in GitHub Desktop.
RPC with bunny
source 'https://rubygems.org'
gem 'bunny'
gem 'hashie'
gem 'pry'
gem 'json'
class MyClient
require "bunny"
require "thread"
require "hashie"
require 'pry'
require 'json'
attr_accessor :route
attr_accessor :response, :call_id
attr_reader :lock, :condition
attr_reader :reply_queue
def initialize(options = {})
# make this connection part a singleton and a LOT of time is saved, as well as reusing the same connection
conn = Bunny.new(:automatically_recover => false)
conn.start
@ch = conn.create_channel
@defaults = Hashie::Mash.new({
server_queue: nil,
exchange: @ch.default_exchange
}.merge(options)
)
@lock = Mutex.new
@condition = ConditionVariable.new
end
def listen_for_response
# listen on a new queue for this response
@reply_queue = @ch.queue("", :exclusive => true)
@reply_queue.subscribe do |delivery_info, properties, payload|
puts "response_id #{properties[:correlation_id]}"
puts properties[:correlation_id] == self.call_id ? "correct id" : "BAD id"
if properties[:correlation_id] == self.call_id
self.response = payload
self.lock.synchronize{self.condition.signal}
end
end
end
def send_request(routing_options, method, params)
self.call_id = self.generate_uuid
data_string = {method: method, params: params}.to_json
routing_options.exchange.publish(
data_string,
routing_key: routing_options.server_queue,
correlation_id: call_id,
reply_to: @reply_queue.name)
puts "call id #{call_id}"
self.response = nil
# params to synchronize are mutex, timeout_in_seconds
lock.synchronize{condition.wait(lock, 5)}
response
end
def request(options = {})
options = Hashie::Mash.new(options)
# grab out the expected data
method = options.delete(:method)
params = options.delete(:params)
# merge the connection options with the defaults
routing_options = @defaults.merge(options)
listen_for_response
response = send_request(routing_options, method, params)
# parse and return response
Hashie::Mash.new(JSON.parse(response))
end
def generate_uuid
# very naive but good enough for code
# examples
"#{rand}#{rand}#{rand}"
end
end
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
require 'hashie'
require 'json'
require 'pry'
conn = Bunny.new(:automatically_recover => false)
conn.start
ch = conn.create_channel
class FibonacciServer
def initialize(ch)
@ch = ch
end
def start(queue_name)
@q = @ch.queue(queue_name, durable: true)
@x = @ch.default_exchange
@q.subscribe(:block => true, ack: true) do |delivery_info, properties, payload|
req = Hashie::Mash.new(JSON.parse(payload))
if req[:method] == 'fib'
n = req.params.number.to_i
r = self.class.fib(n)
puts " [.] fib(#{n})"
data_string = {data: { value: r } }.to_json
@x.publish(data_string, :routing_key => properties.reply_to, :correlation_id => properties.correlation_id)
@ch.acknowledge(delivery_info.delivery_tag, false)
end
end
end
def self.fib(n)
case n
when 0 then 0
when 1 then 1
else
fib(n - 1) + fib(n - 2)
end
end
end
begin
server = FibonacciServer.new(ch)
" [x] Awaiting RPC requests"
server.start("rpc_queue")
rescue Interrupt => _
ch.close
conn.close
exit(0)
end
# DSL I WANT to use
# route = queue: 'user_service'
# client = MyClient.new
# response = client.request(route: route, method: 'show', params: {})
# response.data = { some: hash_value }
# response.error? # => false
require './my_client'
client = MyClient.new(server_queue: 'rpc_queue')
(1..100).each do |c|
response = client.request(method: :fib, params: { number: 20 })
puts "i got this #{response.data.value}"
end
@jdoconnor
Copy link
Author

a couple of small modifications to https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/ruby

  • make the server queue durable
  • explicitly ack messages from the server
  • timeout on the client if the server is non-responsive
  • have the client loop for a little bit
  • print the correlation_ids to the console, as well as noting if an ID is incorrect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment