Skip to content

Instantly share code, notes, and snippets.

@mattdenner
Created March 27, 2010 10:54
Show Gist options
  • Save mattdenner/345931 to your computer and use it in GitHub Desktop.
Save mattdenner/345931 to your computer and use it in GitHub Desktop.
RabbitMQ publisher & subscriber in Ruby
#!/usr/bin/env ruby
require 'rubygems'
require 'amqp'
require 'mq'
require 'uuid'
exchange_type = ARGV.first or raise "Usage: publisher <exchange type>"
[ :direct, :fanout, :topic ].include?(exchange_type.to_sym) or raise "You can only use direct, fanout or topic"
KEYS = [ 'even', 'odd' ]
puts "Starting the #{ exchange_type } publisher ..."
AMQP.start(:host => 'localhost') do
amq = MQ.new
puts "Connecting ..."
exchange = amq.method(exchange_type.to_sym).call('exchange') # Can't use send() here!
marker = UUID.new.generate
puts "Sending marker #{ marker } ..."
exchange.publish(marker)
puts "Sending messages to #{ exchange.name } ..."
(1..100).each { |i| exchange.publish(i, :routing_key => KEYS[ i % 2 ]) }
puts "Shutting down ..."
AMQP.stop { EM.stop }
puts "Done"
end
#!/usr/bin/env ruby
require 'rubygems'
require 'amqp'
require 'mq'
require 'uuid'
exchange_type = ARGV.first or raise "Usage: subscriber <exchange type>"
[ :direct, :fanout, :topic ].include?(exchange_type.to_sym) or raise "You can only use direct, fanout or topic"
puts "Starting the #{ exchange_type } subscriber ..."
AMQP.start(:host => 'localhost') do
amq = MQ.new
puts "Connecting ..."
options = { }
options[ :key ] = ARGV.last unless ARGV.length == 1
exchange = amq.method(exchange_type.to_sym).call('exchange') # Can't use send() here!
queue_name = (exchange_type.to_sym == :direct) ? 'exchange' : UUID.new.generate
queue = amq.queue(queue_name).bind(exchange, options)
puts "Subscribing for messages on #{ queue.name } ..."
queue.subscribe(:ack => true) do |header,message|
puts "\tMessage: #{ message }" }
header.ack
end
puts "Waiting for messages ..."
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment