Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AMQP Server using AMQP and EM-Websockets
require 'rubygems'
require 'amqp'
require 'mongo'
require 'em-websocket'
require 'json'
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
puts "---PARSED MESSAGE ---" + parsed_message.inspect
response = {}
if parsed_message['status'] == 'status'
response[:status] = 'STATUS'
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname']
elsif parsed_message['status'] == 'message'
response[:status] = 'MESSAGE'
response[:message] = parsed_message['message']
response[:username] = parsed_message['username']
response[:roomname] = parsed_message['roomname'].split().join('_')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new('localhost', 27017).db(database)
@rooms = @db.collection('rooms')
@db
end
def self.save_message(db, room_name, message, timestamp)
posts ||= db.collection("#{room_name.split().joins('_')}")
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
puts "Message : #{message}"
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join('_'))
exchange.publish("#{status[:username]} has just joined #{status[:roomname]}")
posts = MongoManager.get_room_posts(mongo, status[:roomname])
if status[:status] == 'STATUS'
queue = channel.queue(status[:username])
queue.unsubscribe if queue.subscribed?
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == 'MESSAGE'
full_message = "<b> #{status[:username]} :</b> #{status[:message]}"
exchange.publish(full_message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.