public
Last active — forked from fabware/application.js

socket.io application and haproxy configuration

  • Download Gist
application.js
JavaScript

var express = require('express');
var redis = require('redis');
 
const serverType = process.argv[2];
const serverHost = process.argv[3];
const serverPort = parseInt(process.argv[4]);
 
const redisPort = 6379;
const redisHost = '127.0.0.1';
 
var rclient = new redis.createClient(redisPort, redisHost);
 
function writeUserMessageHost(userId, host){
rclient.set('MUPH:'+userId, host);
}
 
function deleteUserMessageHost(userId){
rclient.delete('MUPH:'+userId);
}
 
function writeChannelMessageHost(channel, host){
rclient.sadd('MCMH:'+channel, host);
}
 
function ListenerContainer(){
this.listeners = {};
this.listenersCount = 0;
};
 
ListenerContainer.prototype.addClient = function(client){
this.listeners[client.listenerId] = client;
this.listenersCount += 1;
writeUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
ListenerContainer.prototype.removeClient = function(client){
delete this.listeners[client.listenerId];
this.listenersCount -= 1;
deleteUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
ListenerContainer.prototype.getClients = function(listenerId){
var client = this.listeners[listenerId];
if (client != null){
return {listenerId: client};
}else{
return {};
};
};
 
ListenerContainer.prototype.count = function(listenerId){
return this.listenersCount;
1,17 Top
ListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
var firstListener = this.listeners[listenerId].listener;
break;
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
 
function MultiListenerContainer() {
this.listeners = {};
this.listenersCount = {'__total': 0};
};
 
MultiListenerContainer.prototype.addClient = function(client){
// console.log('add client to channel:', client.listenerId, client.sessionId);
if (client.listenerId in this.listeners){
if(!(client.sessionId in this.listeners[client.listenerId])){
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] += 1;
this.listenersCount['__total'] += 1;
};
 
}else{
this.listeners[client.listenerId] = {};
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] = 1;
this.listenersCount['__total'] += 1;
// console.log('init client', client.sessionId);
};
writeChannelMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
MultiListenerContainer.prototype.removeClient = function(client){
if (client.listenerId in this.listeners){
if (client.sessionId in this.listeners[client.listenerId]){
delete this.listeners[client.listenerId][client.sessionId];
this.listenersCount[client.listenerId] -= 1;
this.listenersCount['__total'] -= 1;
}
}
};
MultiListenerContainer.prototype.getClients = function(listenerId){
if (listenerId in this.listeners){
var clients = this.listeners[listenerId];
if (clients){
return clients;
}else{
return {}
};
}
 
};
 
MultiListenerContainer.prototype.count = function(listenerId){
if (listenerId){
if (listenerId in this.listenersCount){
return this.listenersCount[listenerId];
}else{
return 0;
}
}else{
return this.listenersCount;
};
};
 
MultiListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
for (sessionId in this.listeners[listenerId]){
var firstListener = this.listeners[listenerId][sessionId].listener;
break;
};
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
 
var userContainer = new ListenerContainer();
var channelContainer = new MultiListenerContainer();
 
var server = module.exports = express.createServer();
server.get('/post/:msg_target/:msg_target_id/:msg', function(req, res){
var sentCounter = 0;
if (req.params.msg_target == 'channel'){
var container = channelContainer;
}else if(req.params.msg_target == 'user')
{
var container = userContainer;
}
var clients = container.getClients(req.params.msg_target_id);
for(listenerId in clients){
var client = clients[listenerId];
var msgBuf = new Buffer(req.params.msg, 'base64');
var msgStr = msgBuf.toString('utf-8');
client.send(msgStr);
sentCounter += 1;
}
res.send(''+sentCounter);
});
 
server.get('/stats/count', function(req, res){
var result = {};
result['user'] = userContainer.count();
result['channel'] = channelContainer.count();
result['user_clients'] = userContainer.clientsCount();
result['channel_clients'] = channelContainer.clientsCount();
res.send(JSON.stringify(result));
});
server.get('/debug/check/session/:session_id', function(req, res){
var found = 'not found';
for(userId in userContainer.listeners){
var client = userContainer.listeners[userId];
if (client.sessionId == req.params.session_id){
found = 'found in user: '+userId;
}
};
for(channel in channelContainer.listeners){
for(sessionId in channelContainer.listeners[channel]){
if(sessionId == req.params.session_id){
found = 'found in channel: '+channel;
}
}
}
res.send(found);
});
 
function getRequestExtra(url){
return url.split('/')[2];
};
 
 
const io = require('socket.io');
 
function serveChannelMessage(){
console.log('starting channel server...');
const socket = io.listen(server, {'resource': 'channelmsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
 
socket.on('connection', function(client){
// console.dir(channelContainer);
var extra = getRequestExtra(client.request.url);
 
var extraParts = extra.split('_');
client.listenerId = extraParts[0];
var onlineToken = extraParts[1];
channelContainer.addClient(client);
 
client.on('message', function(msg){
});
 
client.on('disconnect', function(){
channelContainer.removeClient(this);
});
});
};
function serveUserMessage(){
const socket = io.listen(server, {'resource': 'usermsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
 
socket.on('connection', function(client){
client.listenerId = getRequestExtra(client.request.url);
userContainer.addClient(client);
 
client.on('message', function(msg){
});
 
client.on('disconnect', function(){
userContainer.removeClient(this);
});
});
 
};
 
if(serverType == 'user'){
serveUserMessage();
}else if(serverType == 'channel'){
serveChannelMessage();
}
 
server.listen(serverPort, serverHost);
haproxy.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
global
# daemon
maxconn 200000
pidfile /var/run/haproxy.pid
user nobody
group nobody
# debug
 
defaults
mode http
# option http-server-close
# option http-pretend-keepalive
# option httplog
stats uri /haproxy_stats
 
frontend flashpolicy 0.0.0.0:843
mode tcp
maxconn 20000
timeout client 86400000
default_backend flashpolicy_servers
 
backend flashpolicy_servers
mode tcp
balance roundrobin
timeout server 86400000
timeout connect 86400000
server fp1 10.0.0.55:10000 weight 1 maxconn 4000 check
server fp2 10.0.0.55:10001 weight 1 maxconn 4000 check
server fp3 10.0.0.55:10002 weight 1 maxconn 4000 check
server fp4 10.0.0.55:10003 weight 1 maxconn 4000 check
server fp5 10.0.0.55:10004 weight 1 maxconn 4000 check
 
frontend all 0.0.0.0:30000
maxconn 200000
timeout client 86400000
acl is_usermsg_stream path_dir usermsg
use_backend usermsg_stream_servers if is_usermsg_stream
acl is_channelmsg_stream path_dir channelmsg
use_backend channelmsg_stream_servers if is_channelmsg_stream
 
backend usermsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream20 10.0.0.55:40001 weight 1 maxconn 10000 check
server stream21 10.0.0.55:40002 weight 1 maxconn 10000 check
#server stream22 10.0.0.55:40003 weight 1 maxconn 10000 check
#server stream23 10.0.0.55:40004 weight 1 maxconn 10000 check
#server stream24 10.0.0.55:40005 weight 1 maxconn 10000 check
#server stream25 10.0.0.55:40006 weight 1 maxconn 10000 check
#server stream26 10.0.0.55:40007 weight 1 maxconn 10000 check
#server stream27 10.0.0.55:40008 weight 1 maxconn 10000 check
#server stream28 10.0.0.55:40009 weight 1 maxconn 10000 check
#server stream29 10.0.0.55:40010 weight 1 maxconn 10000 check
 
backend channelmsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream30 10.0.0.55:50001 weight 1 maxconn 20000 check
server stream31 10.0.0.55:50002 weight 1 maxconn 20000 check
#server stream32 10.0.0.55:50003 weight 1 maxconn 10000 check
#server stream33 10.0.0.55:50004 weight 1 maxconn 10000 check
#server stream34 10.0.0.55:50005 weight 1 maxconn 10000 check
#server stream35 10.0.0.55:50006 weight 1 maxconn 10000 check
#server stream36 10.0.0.55:50007 weight 1 maxconn 10000 check
#server stream37 10.0.0.55:50008 weight 1 maxconn 10000 check
#server stream38 10.0.0.55:50009 weight 1 maxconn 10000 check
#server stream39 10.0.0.55:50010 weight 1 maxconn 10000 check

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.