public
Last active — forked from fabware/application.js

socket.io application and haproxy configuration

  • Download Gist
application.js
JavaScript
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
var express = require('express');
var redis = require('redis');
 
const serverType = process.argv[2];
const serverHost = process.argv[3];
const serverPort = parseInt(process.argv[4]);
 
const redisPort = 6379;
const redisHost = '127.0.0.1';
 
var rclient = new redis.createClient(redisPort, redisHost);
 
function writeUserMessageHost(userId, host){
rclient.set('MUPH:'+userId, host);
}
 
function deleteUserMessageHost(userId){
rclient.delete('MUPH:'+userId);
}
 
function writeChannelMessageHost(channel, host){
rclient.sadd('MCMH:'+channel, host);
}
 
function ListenerContainer(){
this.listeners = {};
this.listenersCount = 0;
};
 
ListenerContainer.prototype.addClient = function(client){
this.listeners[client.listenerId] = client;
this.listenersCount += 1;
writeUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
ListenerContainer.prototype.removeClient = function(client){
delete this.listeners[client.listenerId];
this.listenersCount -= 1;
deleteUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
ListenerContainer.prototype.getClients = function(listenerId){
var client = this.listeners[listenerId];
if (client != null){
return {listenerId: client};
}else{
return {};
};
};
 
ListenerContainer.prototype.count = function(listenerId){
return this.listenersCount;
1,17 Top
ListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
var firstListener = this.listeners[listenerId].listener;
break;
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
 
function MultiListenerContainer() {
this.listeners = {};
this.listenersCount = {'__total': 0};
};
 
MultiListenerContainer.prototype.addClient = function(client){
// console.log('add client to channel:', client.listenerId, client.sessionId);
if (client.listenerId in this.listeners){
if(!(client.sessionId in this.listeners[client.listenerId])){
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] += 1;
this.listenersCount['__total'] += 1;
};
 
}else{
this.listeners[client.listenerId] = {};
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] = 1;
this.listenersCount['__total'] += 1;
// console.log('init client', client.sessionId);
};
writeChannelMessageHost(client.listenerId, serverHost+':'+serverPort);
};
 
MultiListenerContainer.prototype.removeClient = function(client){
if (client.listenerId in this.listeners){
if (client.sessionId in this.listeners[client.listenerId]){
delete this.listeners[client.listenerId][client.sessionId];
this.listenersCount[client.listenerId] -= 1;
this.listenersCount['__total'] -= 1;
}
}
};
MultiListenerContainer.prototype.getClients = function(listenerId){
if (listenerId in this.listeners){
var clients = this.listeners[listenerId];
if (clients){
return clients;
}else{
return {}
};
}
 
};
 
MultiListenerContainer.prototype.count = function(listenerId){
if (listenerId){
if (listenerId in this.listenersCount){
return this.listenersCount[listenerId];
}else{
return 0;
}
}else{
return this.listenersCount;
};
};
 
MultiListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
for (sessionId in this.listeners[listenerId]){
var firstListener = this.listeners[listenerId][sessionId].listener;
break;
};
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
 
var userContainer = new ListenerContainer();
var channelContainer = new MultiListenerContainer();
 
var server = module.exports = express.createServer();
server.get('/post/:msg_target/:msg_target_id/:msg', function(req, res){
var sentCounter = 0;
if (req.params.msg_target == 'channel'){
var container = channelContainer;
}else if(req.params.msg_target == 'user')
{
var container = userContainer;
}
var clients = container.getClients(req.params.msg_target_id);
for(listenerId in clients){
var client = clients[listenerId];
var msgBuf = new Buffer(req.params.msg, 'base64');
var msgStr = msgBuf.toString('utf-8');
client.send(msgStr);
sentCounter += 1;
}
res.send(''+sentCounter);
});
 
server.get('/stats/count', function(req, res){
var result = {};
result['user'] = userContainer.count();
result['channel'] = channelContainer.count();
result['user_clients'] = userContainer.clientsCount();
result['channel_clients'] = channelContainer.clientsCount();
res.send(JSON.stringify(result));
});
server.get('/debug/check/session/:session_id', function(req, res){
var found = 'not found';
for(userId in userContainer.listeners){
var client = userContainer.listeners[userId];
if (client.sessionId == req.params.session_id){
found = 'found in user: '+userId;
}
};
for(channel in channelContainer.listeners){
for(sessionId in channelContainer.listeners[channel]){
if(sessionId == req.params.session_id){
found = 'found in channel: '+channel;
}
}
}
res.send(found);
});
 
function getRequestExtra(url){
return url.split('/')[2];
};
 
 
const io = require('socket.io');
 
function serveChannelMessage(){
console.log('starting channel server...');
const socket = io.listen(server, {'resource': 'channelmsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
 
socket.on('connection', function(client){
// console.dir(channelContainer);
var extra = getRequestExtra(client.request.url);
 
var extraParts = extra.split('_');
client.listenerId = extraParts[0];
var onlineToken = extraParts[1];
channelContainer.addClient(client);
 
client.on('message', function(msg){
});
 
client.on('disconnect', function(){
channelContainer.removeClient(this);
});
});
};
function serveUserMessage(){
const socket = io.listen(server, {'resource': 'usermsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
 
socket.on('connection', function(client){
client.listenerId = getRequestExtra(client.request.url);
userContainer.addClient(client);
 
client.on('message', function(msg){
});
 
client.on('disconnect', function(){
userContainer.removeClient(this);
});
});
 
};
 
if(serverType == 'user'){
serveUserMessage();
}else if(serverType == 'channel'){
serveChannelMessage();
}
 
server.listen(serverPort, serverHost);
haproxy.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
global
# daemon
maxconn 200000
pidfile /var/run/haproxy.pid
user nobody
group nobody
# debug
 
defaults
mode http
# option http-server-close
# option http-pretend-keepalive
# option httplog
stats uri /haproxy_stats
 
frontend flashpolicy 0.0.0.0:843
mode tcp
maxconn 20000
timeout client 86400000
default_backend flashpolicy_servers
 
backend flashpolicy_servers
mode tcp
balance roundrobin
timeout server 86400000
timeout connect 86400000
server fp1 10.0.0.55:10000 weight 1 maxconn 4000 check
server fp2 10.0.0.55:10001 weight 1 maxconn 4000 check
server fp3 10.0.0.55:10002 weight 1 maxconn 4000 check
server fp4 10.0.0.55:10003 weight 1 maxconn 4000 check
server fp5 10.0.0.55:10004 weight 1 maxconn 4000 check
 
frontend all 0.0.0.0:30000
maxconn 200000
timeout client 86400000
acl is_usermsg_stream path_dir usermsg
use_backend usermsg_stream_servers if is_usermsg_stream
acl is_channelmsg_stream path_dir channelmsg
use_backend channelmsg_stream_servers if is_channelmsg_stream
 
backend usermsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream20 10.0.0.55:40001 weight 1 maxconn 10000 check
server stream21 10.0.0.55:40002 weight 1 maxconn 10000 check
#server stream22 10.0.0.55:40003 weight 1 maxconn 10000 check
#server stream23 10.0.0.55:40004 weight 1 maxconn 10000 check
#server stream24 10.0.0.55:40005 weight 1 maxconn 10000 check
#server stream25 10.0.0.55:40006 weight 1 maxconn 10000 check
#server stream26 10.0.0.55:40007 weight 1 maxconn 10000 check
#server stream27 10.0.0.55:40008 weight 1 maxconn 10000 check
#server stream28 10.0.0.55:40009 weight 1 maxconn 10000 check
#server stream29 10.0.0.55:40010 weight 1 maxconn 10000 check
 
backend channelmsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream30 10.0.0.55:50001 weight 1 maxconn 20000 check
server stream31 10.0.0.55:50002 weight 1 maxconn 20000 check
#server stream32 10.0.0.55:50003 weight 1 maxconn 10000 check
#server stream33 10.0.0.55:50004 weight 1 maxconn 10000 check
#server stream34 10.0.0.55:50005 weight 1 maxconn 10000 check
#server stream35 10.0.0.55:50006 weight 1 maxconn 10000 check
#server stream36 10.0.0.55:50007 weight 1 maxconn 10000 check
#server stream37 10.0.0.55:50008 weight 1 maxconn 10000 check
#server stream38 10.0.0.55:50009 weight 1 maxconn 10000 check
#server stream39 10.0.0.55:50010 weight 1 maxconn 10000 check

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.