Skip to content

Instantly share code, notes, and snippets.

@ntamvl
Last active February 21, 2024 11:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ntamvl/8f105701e3f2e7a1bb7378454663b32f to your computer and use it in GitHub Desktop.
Save ntamvl/8f105701e3f2e7a1bb7378454663b32f to your computer and use it in GitHub Desktop.
RabbitMQ Ruby Example
# an initializer
# gem install bunny
# written by: Tam Nguyen (twitter: @nguyentamvn)
require 'bunny'
require 'json'
class RabbitPublisher
def initialize(options = {})
bunny_config = {
host: options[:host] || 'localhost',
user: options[:user],
pass: options[:pass],
heartbeat_interval: 8,
automatically_recover: true,
timeout: 7
}
rabbit_connection = Bunny.new(bunny_config)
rabbit_connection.start
rabbit_channel = rabbit_connection.create_channel
self.channel = rabbit_channel
self.connection = rabbit_connection
end
def publish(options = {})
queue_name = options[:queue_name]
message = options[:message]
if queue_name.nil? || queue_name.empty?
puts 'queue_name is missing'
return
end
queue = channel.queue(queue_name, durable: true)
begin
puts "Sending message..."
queue.publish(message.to_json, persistent: true)
rescue Exception => e
puts "[ERROR] #{e}"
close
end
end
def receive(options = {})
queue_name = options[:queue_name]
if queue_name.nil? || queue_name.empty?
puts 'queue_name is missing'
return
end
puts "Waiting queue: #{queue_name}..."
channel.prefetch(1)
begin
queue = channel.queue(queue_name, durable: true)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
yield channel, delivery_info, body
end
rescue Exception => e
puts "[ERROR] #{e}"
close
end
end
def close
channel.close
connection.close
puts 'Closed rabbitmq connections.'
end
private
attr_accessor :channel
attr_accessor :connection
end
# how to use
# rabbit = RabbitPublisher.new
# for publishing messages
# rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen" })
# (1..10000).each {|i| rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen #{i}" }); sleep 0.1 }
# for subscribing messages
# rabbit.receive(queue_name: 'twitter') do |channel, delivery_info, body|
# puts " [x] #{body}"
# channel.ack(delivery_info.delivery_tag)
# end
# should close connecttions whern completed tasks
# rabbit.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment