Skip to content

Instantly share code, notes, and snippets.

@karmi
Created January 22, 2011 09:09
Show Gist options
  • Save karmi/791002 to your computer and use it in GitHub Desktop.
Save karmi/791002 to your computer and use it in GitHub Desktop.
AMQP demo in Ruby

AMQP Demo in Ruby

Download the code:

git clone git://gist.github.com/791002.git amqp-demo-ruby
cd amqp-demo-ruby

Install the gems:

gem install amqp json ansi

Run the publisher:

ruby publisher.logs.rb

Run the consumer for all logs:

ruby consumer.logs.all.rb

Run the consumer for error level logs:

ruby consumer.logs.error.rb

Three initial messages are published to the log exchange. Publish messages with:

info: HEY
debug: DEBUG
error: SOMETHING BAD HAPPENED

https://gist.github.com/791002

require 'rubygems'
require 'mq'
require 'json'
require 'ansi'
require 'time'
require 'socket'
module AMQP
def self.local?
TCPSocket.new 'localhost', 5672
rescue Errno::ECONNREFUSED
false
end
end
HOST = AMQP.local? ? 'localhost' : 'dev.rabbitmq.com'
require 'config'
trap('SIGINT') { puts "\nExiting..."; exit(0) }
AMQP.start(:host => HOST) do
puts "Receiving all log messages from '#{HOST}'", "="*80
MQ.queue("all logs").bind( 'log', :key => "log.*" ).subscribe do |header, message|
# puts "Received message:"
# p header
# p body
message = JSON.parse(message)
color = case message['level']
when 'info' then :green
when 'debug' then :yellow
when 'error' then :red
end
level = message['level'].upcase.rjust(15).ansi( color )
time = Time.parse(message['time']).strftime('%m/%d/%Y %H:%M:%S')
puts [ level, time, message['body'] ].join(' | ')
end
end
require 'config'
trap('SIGINT') { puts "\nExiting..."; exit(0) }
AMQP.start(:host => HOST) do
puts "Receiving ERROR log messages from '#{HOST}'", "="*80
MQ.queue("error logs").bind( MQ.topic('log'), :key => "log.error" ).subscribe do |message|
body = JSON.load(message)["body"]
puts "ERROR: " + body, "-"*80
`growlnotify --image rabbitmq.png --name AMQP --message '#{body}' 2> /dev/null` rescue nil
end
end
require 'config'
trap('SIGINT') { puts "\nExiting..."; exit(0) }
def publish(level, message)
Thread.new do
AMQP.start(:host => HOST) do
exchange = MQ.topic('log')
exchange.publish( {:level => level, :time => Time.now, :body => message}.to_json, :key => "log.#{level}")
AMQP.stop { EM.stop }
# puts "Published message '#{message}' as '#{level}'"
end
end
end
publish(:info, 'Purring smoothly...')
publish(:debug, 'Look at this, dude.')
publish(:error, 'BOOM! BOOM!! BOOM!!')
puts "-"*80, "Published three initial messages to '#{HOST}'. Have a look in other tabs."
puts "Publish message with <LEVEL>: <MESSAGE> command, eg. `info: This is info`", "-"*80
while input = gets.chomp do
level, message = input.split(': ')
publish level, message unless [level, message].any? { |i| i.nil? }
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment