Instantly share code, notes, and snippets.

Embed
What would you like to do?
AMQP Server using AMQP and EM-Websockets
require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
puts "---PARSED MESSAGE ---" + parsed_message.inspect
response = {}
if parsed_message['status'] == 'status'
response[:status] = 'STATUS'
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname']
elsif parsed_message['status'] == 'message'
response[:status] = 'MESSAGE'
response[:message] = parsed_message['message']
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname'].split().join('_')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new('localhost', 27017).db(database)
@rooms = @db.collection('rooms')
@db
end
def self.save_message(db, room_name, message, timestamp)
posts ||= db.collection("#{room_name.split().joins('_')}")
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
puts "Message : #{message}"
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join('_'))
exchange.publish("#{status[:username]} has just joined #{status[:roomname]}")
posts = MongoManager.get_room_posts(mongo, status[:roomname])
if status[:status] == 'STATUS'
queue = channel.queue(status[:username])
queue.unsubscribe if queue.subscribed?
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == 'MESSAGE'
full_message = "<b> #{status[:username]} :</b> #{status[:message]}"
exchange.publish(full_message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment