Skip to content

Instantly share code, notes, and snippets.

@kmussel
Created November 14, 2014 15:14
Show Gist options
  • Save kmussel/41bfcd17fb2e2c8f1103 to your computer and use it in GitHub Desktop.
Save kmussel/41bfcd17fb2e2c8f1103 to your computer and use it in GitHub Desktop.
Remote Procedure Call class with rabbitmq
require_relative 'rabbitmq.rb'
class Rpc
attr_reader :connection
attr_reader :lock, :condition
attr_accessor :response, :call_id
DEFAULT_RETRY_TIMEOUT = 60
def initialize(opts = {})
@lock = Mutex.new
@condition = ConditionVariable.new
@retries_left = @max_retries = opts['max_retries'] || -1
@retry_timeout = opts['retry_timeout'] || DEFAULT_RETRY_TIMEOUT
@channel = Rabbitmq.instance.channel
@queue = @channel.queue('', auto_delete: true)
self.call_id = UUIDTools::UUID.random_create.to_s
that = self
@queue.subscribe(ack: false) do |meta, payload|
# puts "Meta co = #{meta.correlation_id} and call id = #{that.call_id}"
if meta.correlation_id == that.call_id
that.response = JSON.parse(payload)
that.lock.synchronize{that.condition.signal}
end
end
end
def self.publish(*args)
r = self.new
r.publish(*args)
end
def publish(message = "", route = "#", exchange = "streams")
headers = {reply_to: @queue.name, correlation_id: self.call_id}
lock.synchronize{
if(Rabbitmq.instance.publish(message, route, exchange, headers))
condition.wait(lock, 20)
end
}
@channel.queue_delete(@queue.name, false, false)
response
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment