Created
December 4, 2018 12:53
-
-
Save shasharoman/9825f74b0f37e37f2af0d629112cd887 to your computer and use it in GitHub Desktop.
websocket tunnel core code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const C = require('../constant'); | |
const nUrl = require('url'); | |
const uuid = require('uuid'); | |
const ws = require('ws'); | |
const lib = require(process.env.lib); | |
const logger = lib.logger.createLogger('tunnel-service-ws'); | |
const fw = require(process.env.fw); | |
let id2Socket = {}; // 信道id与socket映射,用于sendMessage | |
let id2Notify = {}; // 信道id的回调信息,暂时使用内部的http rpc方式 | |
let id2UnresponsivePing = {}; // 未收到响应的ping数量,用于判断客户端连接状态 | |
exports.init = init; // 初始化websocket listen | |
exports.connectUrl = connectUrl; // 获取一个ws连接URL | |
exports.closeById = closeById; // 主动关闭 | |
exports.sendMessageById = sendMessageById; // 信道发送消息 | |
function init() { | |
const wss = new ws.Server({ | |
host: C.HOST, | |
port: C.PORT, | |
clientTracking: true | |
}); | |
wss.on('connection', (socket, req) => { | |
let id = nUrl.parse(req.url, true).query.id; | |
// 不是从connectUrl获取的连接url一律拒绝连接 | |
if (!id || _.isUndefined(id2Socket[id])) { | |
logger.info('illegal connection', req.url); | |
return socket.close(1000, 'CLOSE_NORMAL'); | |
} | |
socket.id = id; | |
socket.isAlive = true; | |
id2Socket[id] = socket; | |
id2UnresponsivePing[id] = 0; | |
socket.on('close', (code, reason) => { | |
logger.info(`[${id}] close`, code, reason); | |
// 信道关闭10分钟内未重连 | |
setTimeout(() => { | |
let socket = id2Socket[id]; | |
if (socket && socket.readyState === ws.CLOSED) { | |
delete id2Socket[id]; | |
delete id2Notify[id]; | |
delete id2UnresponsivePing[id]; | |
} | |
}, 600000); | |
notifyDownstream(id, 'close', `${code}:${reason}`); | |
}); | |
socket.on('error', err => { | |
logger.error(`[${id}] error`, err); | |
notifyDownstream(id, 'error', err.toString()); | |
}); | |
socket.on('message', msg => { | |
socket.isAlive = true; | |
id2UnresponsivePing[id] = 0; | |
// send ping/millseconds to client then expect pong/millseconds from client | |
if (_.startsWith(msg, 'pong')) { | |
let match = msg.match(/^pong\/(\d+)$/); | |
if (match) { | |
socket.rtt = Date.now() - Number(match[1]); // round trip time | |
} | |
return; | |
} | |
// same design as server ping-pong | |
if (_.startsWith(msg, 'ping')) { | |
socket.send(msg.replace(/^ping/, 'pong')); | |
return; | |
} | |
logger.info(`[${id}] message`, msg); | |
notifyDownstream(id, 'message', msg); | |
}); | |
}); | |
wss.on('close', () => { | |
logger.info('websocket server close'); | |
}); | |
wss.on('error', err => { | |
logger.error('websocket server error'); | |
logger.error(err); | |
}); | |
wss.on('listening', () => { | |
logger.info('websocket server listening'); | |
}); | |
setInterval(_detectAlive, 3000); | |
return Promise.resolve({}); | |
function _detectAlive() { | |
wss.clients.forEach(item => { | |
let unresponsive = id2UnresponsivePing[item.id] || 0; | |
if (unresponsive >= 5) { | |
item.isAlive = false; | |
item.close(1001, 'CLOSE_GOING_AWAY'); | |
return; | |
} | |
try { | |
item.send(`ping/${Date.now()}`, () => { | |
id2UnresponsivePing[item.id] = unresponsive + 1; | |
}); | |
} | |
catch (e) { | |
logger.error(e); | |
} | |
}); | |
} | |
} | |
function connectUrl(notify) { | |
let id = uuid.v4(); | |
id2Socket[id] = null; | |
id2Notify[id] = notify; // 暂时借助框架内http rpc完成通信, 后续观察性能和可靠性再单独优化通信方式 | |
return Promise.resolve({ | |
node: C.NODE, | |
id: id, | |
url: `${C.CONNECT_PREFIX}/${C.NODE}?id=${id}` | |
}); | |
} | |
function closeById(ids, code, reason) { | |
if (!_.isArray(ids)) { | |
ids = [ids]; | |
} | |
_.each(ids, item => { | |
let socket = id2Socket[item]; | |
if (!_.isEmpty(socket)) { | |
socket.close(code, reason); | |
} | |
}); | |
return Promise.resolve({}); | |
} | |
function sendMessageById(ids, msg) { | |
if (!_.isArray(ids)) { | |
ids = [ids]; | |
} | |
return Promise.map(ids, item => { | |
let socket = id2Socket[item]; | |
if (_.isEmpty(socket)) { | |
logger.warn('send message no socket', item); | |
return item; | |
} | |
if (socket.readyState !== ws.OPEN) { | |
logger.warn('send message socket state is not open', item, socket.readyState); | |
return item; | |
} | |
if (!socket.isAlive) { | |
logger.wran('send message socket not alive', item); | |
return item; | |
} | |
return Promise.promisify(socket.send, { | |
context: socket | |
})(msg).then(() => { | |
logger.info(`[${item}] send`, msg); | |
return null; | |
}).catch(err => { | |
logger.error('send message error'); | |
logger.error(err); | |
return item; | |
}); | |
}).then(ret => _.filter(ret, item => !_.isNull(item))); // return failed ids | |
} | |
function notifyDownstream(id, type, msg) { | |
let notify = id2Notify[id]; | |
if (_.isEmpty(notify)) { | |
logger.warn('no notify info', id); | |
return Promise.resolve({}); | |
} | |
return fw.serviceCall(notify.module, notify.service, id, type, msg).then(() => { | |
logger.info(`[${id}] notify`, type, msg); | |
}).catch(err => { | |
logger.error('notify error', id); | |
logger.error(err); | |
closeById(id, 1001, 'downstream error'); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment