Skip to content

Instantly share code, notes, and snippets.

@gauravsaini23
Created March 12, 2013 12:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gauravsaini23/5142467 to your computer and use it in GitHub Desktop.
Save gauravsaini23/5142467 to your computer and use it in GitHub Desktop.
amqp implementation
require 'yaml'
######################
## SET CONFIGURATION
#####################
ENV['JAVA_HOME'] = '/usr/lib/jvm/java-6-openjdk-i386/'
APP_CONFIG = YAML.load_file(File.expand_path("./../../../config/instances/hcl.yml", __FILE__))[ARGV[0].to_s]
unless ARGV[0]
puts "Please pass on the environment param: development | test | production"
abort
end
###################
## REQUIRE FILES
###################
require 'amqp'
require 'em-websocket'
require 'json'
require 'evma_httpserver'
require "active_record"
require 'cgi'
require 'uri'
require 'logger'
require File.expand_path('../../import_classes.rb', __FILE__)
require File.expand_path('../../hbase_connection.rb', __FILE__)
require File.expand_path('../helper.rb', __FILE__)
require File.expand_path('../socket_manager', __FILE__)
require File.expand_path('../../../config/initializers/satorirecord_base', __FILE__)
require File.expand_path('../../../app/models/chat_user', __FILE__)
require File.expand_path('../../../app/models/chathistory', __FILE__)
require File.expand_path('../../../app/models/follower', __FILE__)
require File.expand_path('../../../app/models/securitygroups', __FILE__)
require File.expand_path('../../../app/models/permission_group', __FILE__)
require File.expand_path('../../../app/models/permission_group_user', __FILE__)
require File.expand_path('../../../app/models/live_page', __FILE__)
require File.expand_path('../../../app/models/community', __FILE__)
require File.expand_path('../../../app/models/community_permission', __FILE__)
require File.expand_path('../../../app/models/workspace_permission', __FILE__)
require File.expand_path('../../../app/models/htable_connection_detail', __FILE__)
log_file = File.open(File.expand_path('../../../log/chat_server.log', __FILE__), "a")
logger = Logger.new log_file
## can not load existing user model because it requires other classes like ajax_ful_rater which further requires action view
class User < ActiveRecord::Base
end
##########################
## ESTABLISH CONNECTIONS
##########################
ActiveRecord::Base.establish_connection(YAML.load_file(File.expand_path("./../../../config/database.yml", __FILE__))[ARGV[0].to_s])
HbaseConnection::Base.new(ARGV[0].to_s)
# start the run loop
EventMachine.run do
connection = AMQP.connect(
:host => APP_CONFIG[:webservers][:chat][:rabbitmq][:host],
:port => APP_CONFIG[:webservers][:chat][:rabbitmq][:port],
:user => APP_CONFIG[:webservers][:chat][:rabbitmq][:username],
:password => APP_CONFIG[:webservers][:chat][:rabbitmq][:password]
)
channel = AMQP::Channel.new(connection)
socket_manager = SocketManager.new
puts "---- Server started in #{ARGV[0]} mode ----"
EventMachine::WebSocket.start(:host => APP_CONFIG[:webservers][:chat][:host], :port => APP_CONFIG[:webservers][:chat][:port]) do |ws|
timer = nil
count = 0
ws.onopen do
logger << "Connection established : " + ws.inspect
username = ws.request["query"]["username"]
ChatUser.update_page(username, CGI.unescapeHTML(ws.request["query"]["page"])) if ChatUser.should_record_page?(ws.request["query"]["page"])
SocketManager.new.add_socket(username, ws)
timer = EM.add_periodic_timer(20) do
#send a ping request
ws.send JSON.generate({type: "PING", username: username})
#if count is greater than 5 log the user out and mark him as offline
if count > 5
ChatUser.remove_page(username, CGI.unescapeHTML(ws.request["query"]["page"]))
SocketManager.new.remove_socket(username, ws)
unless SocketManager.new.any_socket_exists?(username)
ChatUser.update_status(username, false)
ChatUser.remove_all_pages(username)
exchange = channel.direct("")
full_name = Helper.get_full_name(username)
Helper.get_users_to_send(username).each do |name|
status = {username: username, status: 'offline', type: 'STATUS', full_name: full_name}
exchange.publish(JSON.generate(status), :routing_key => name)
end
end
#cancel the timer
timer.cancel
end
#increment the count
count +=1
end
end
ws.onmessage do |message|
req = Helper.parse(message)
exchange = channel.direct("")
if req[:type] == 'STATUS'
queue = channel.queue!(req[:username])
queue.unsubscribe if queue.subscribed?
queue.subscribe {|payload| SocketManager.new.sockets[req[:username]].each {|socket| socket.send(payload) }}
AMQP::Consumer.new(channel, queue)
#add status of user into activerecord database, so that it can be used by main application
ChatUser.update_status(req[:username], (req[:status] == 'online' ? 1 : 0))
#get the page on which this user is and send the status to the concerned user
req[:full_name] = Helper.get_full_name(req[:username])
exchange.publish(JSON.generate(req), :routing_key => req[:username])
req[:user_type] = "FOLLOWING"
json_following = JSON.generate(req)
req[:user_type] = "FOLLOWER"
json_follower = JSON.generate(req)
Follower.online_followers(req[:username]).each do |name|
exchange.publish(json_following, :routing_key => name.user_id)
end
Follower.online_followings(req[:username]).each do |name|
exchange.publish(json_follower, :routing_key => name.user_id)
end
profile_of, community_of, workspace_of = ChatUser.on_pages_of(req[:username])
req[:user_type] = "PROFILE_VIEWER"
json_profile_viewer = JSON.generate(req)
profile_of.each{|name| exchange.publish(json_profile_viewer, :routing_key => name)}
community_of.each do |comm, names|
req[:user_type] = "COMM_VIEWER"
req[:comm_name] = comm
json_comm_viewer = JSON.generate(req)
names.each{|name| exchange.publish(json_comm_viewer, :routing_key => name)}
end
workspace_of.each do |work|
req[:user_type] = "WORK_VIEWER"
req[:comm_name] = work[:comm_url]
req[:work_name] = work[:work_url]
json_comm_viewer = JSON.generate(req)
work[:users].each{|name| exchange.publish(json_comm_viewer, :routing_key => name)}
end
helper = Helper.new
helper.users_on_my_communities(req[:username]).each do |comm_name, names|
req[:user_type] = "COMM_ADMIN"
req[:comm_name] = comm_name
names.each{|name| exchange.publish(JSON.generate(req), :routing_key => name)}
end
helper.users_on_my_workspaces(req[:username]).each do |my_hash|
req[:user_type] = "WORK_ADMIN"
req[:comm_name] = my_hash[:comm_url]
req[:work_name] = my_hash[:work_url]
my_hash[:users].each{|name| exchange.publish(req.to_json, :routing_key => name)}
end
req.delete(:comm_name)
helper.get_community_members(req[:username]).each do |comm_id, names|
req[:user_type] = "COMM_MEM"
req[:comm_id] = comm_id
names.each{|name| exchange.publish(JSON.generate(req), :routing_key => name)}
end
elsif req[:type] == 'MESSAGE'
req[:message] = (URI.encode(CGI.escape_html(req[:message])))
req[:row_key], req[:timestamp] = Chathistory.add_message(req)
req[:full_name] = Helper.get_full_name(req[:username])
json = JSON.generate(req)
exchange.publish(json, :routing_key => req[:message_to])
exchange.publish(json, :routing_key => req[:username])
elsif req[:type] == 'HISTORY'
history = Chathistory.get_history_for_single_chat(req[:row_key])
exchange.publish(JSON.generate({type: "HISTORY", history: history, row_key: req[:row_key]}), :routing_key => req[:username])
elsif req[:type] == 'PONG'
count -= 1
end
end
ws.onclose do
username = ws.request["query"]["username"]
page = CGI.unescapeHTML(ws.request["query"]["page"])
exchange = channel.direct("")
if match = page.match(/\/profile\/(\S*)\/updates/)
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'profile'}), :routing_key => match.values_at(1)[0])
elsif match = page.match(/\/communities\/(\S*)($|\/)/)
(PermissionGroupUser.joins(:permission_group)
.where("permission_groups.permission_group = '#{match.values_at(1)[0]} owners'")
.select("user_name")
.collect(&:user_name).uniq - [username]).each do |user_name|
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'community', community: match.values_at(1)[0]}), :routing_key => user_name)
end
end
sm= SocketManager.new
sm.remove_socket(username, ws)
EM.add_timer(5) do
#if there is no socket open with this page, remove the page from user's account
ChatUser.remove_page(username, page) unless sm.socket_with_page_exists?(username, page)
end
#if lost the connection for more than 10 seconds mark the user as offline and remove the sockets else do nothing
EM.add_timer(10) do
# if there is no socket open for this user mark the status as offline
unless sm.any_socket_exists?(username)
#add status of user into activerecord database, so that it can be used by main application
ChatUser.remove_all_pages(username)
ChatUser.update_status(username, false)
json_status = JSON.generate({username: username, status: 'offline', type: 'STATUS', full_name: Helper.get_full_name(username)})
Helper.get_users_to_send(username).each do |name|
exchange.publish(json_status, :routing_key => name)
end
end
end
end
ws.onerror do |error|
puts "error called ---- " + error.inspect
puts error.backtrace
unless error.class == EventMachine::WebSocket::HandshakeError
username = ws.request["query"]["username"]
page = CGI.unescapeHTML(ws.request["query"]["page"])
exchange = channel.direct("")
if match = page.match(/\/profile\/(\S*)\/updates/)
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'profile'}), :routing_key => match.values_at(1)[0])
elsif match = page.match(/\/communities\/(\S*)($|\/)/)
(PermissionGroupUser.joins(:permission_group)
.where("permission_groups.permission_group = '#{match.values_at(1)[0]} owners'")
.select("user_name")
.collect(&:user_name).uniq - [username]).each do |user_name|
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'community', community: match.values_at(1)[0]}), :routing_key => user_name)
end
end
ChatUser.remove_page(username, page)
SocketManager.new.remove_socket(username, ws)
#if lost the connection for more than 10 seconds mark the user as offline and remove the sockets else do nothing
EM.add_timer(10) do
# if there is no socket open for this user mark the status as offline
unless SocketManager.new.any_socket_exists?(username)
#add status of user into activerecord database, so that it can be used by main application
ChatUser.remove_all_pages(username)
ChatUser.update_status(username, false)
exchange = channel.direct("")
Helper.get_users_to_send(username).each do |name|
status = {username: username, status: 'offline', type: 'STATUS'}
exchange.publish(JSON.generate(status), :routing_key => name)
end
end
end
end
end
end
#set all users to offline when eventmachine stops
Signal.trap("INT") do
ChatUser.update_all(online: false)
SocketManager.new.sockets.values.flatten.each{|sock| sock.send(JSON.generate({type: 'SERVER_DOWN'}))}
EventMachine.stop
end
Signal.trap("TERM") do
ChatUser.update_all(online: false)
SocketManager.new.sockets.values.flatten.each{|sock| sock.send(JSON.generate({type: 'SERVER_DOWN'}))}
EventMachine.stop
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment