Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
websocket tunnel core code
const C = require('../constant');
const nUrl = require('url');
const uuid = require('uuid');
const ws = require('ws');
const lib = require(process.env.lib);
const logger = lib.logger.createLogger('tunnel-service-ws');
const fw = require(process.env.fw);
let id2Socket = {}; // 信道id与socket映射,用于sendMessage
let id2Notify = {}; // 信道id的回调信息,暂时使用内部的http rpc方式
let id2UnresponsivePing = {}; // 未收到响应的ping数量,用于判断客户端连接状态
exports.init = init; // 初始化websocket listen
exports.connectUrl = connectUrl; // 获取一个ws连接URL
exports.closeById = closeById; // 主动关闭
exports.sendMessageById = sendMessageById; // 信道发送消息
function init() {
const wss = new ws.Server({
host: C.HOST,
port: C.PORT,
clientTracking: true
});
wss.on('connection', (socket, req) => {
let id = nUrl.parse(req.url, true).query.id;
// 不是从connectUrl获取的连接url一律拒绝连接
if (!id || _.isUndefined(id2Socket[id])) {
logger.info('illegal connection', req.url);
return socket.close(1000, 'CLOSE_NORMAL');
}
socket.id = id;
socket.isAlive = true;
id2Socket[id] = socket;
id2UnresponsivePing[id] = 0;
socket.on('close', (code, reason) => {
logger.info(`[${id}] close`, code, reason);
// 信道关闭10分钟内未重连
setTimeout(() => {
let socket = id2Socket[id];
if (socket && socket.readyState === ws.CLOSED) {
delete id2Socket[id];
delete id2Notify[id];
delete id2UnresponsivePing[id];
}
}, 600000);
notifyDownstream(id, 'close', `${code}:${reason}`);
});
socket.on('error', err => {
logger.error(`[${id}] error`, err);
notifyDownstream(id, 'error', err.toString());
});
socket.on('message', msg => {
socket.isAlive = true;
id2UnresponsivePing[id] = 0;
// send ping/millseconds to client then expect pong/millseconds from client
if (_.startsWith(msg, 'pong')) {
let match = msg.match(/^pong\/(\d+)$/);
if (match) {
socket.rtt = Date.now() - Number(match[1]); // round trip time
}
return;
}
// same design as server ping-pong
if (_.startsWith(msg, 'ping')) {
socket.send(msg.replace(/^ping/, 'pong'));
return;
}
logger.info(`[${id}] message`, msg);
notifyDownstream(id, 'message', msg);
});
});
wss.on('close', () => {
logger.info('websocket server close');
});
wss.on('error', err => {
logger.error('websocket server error');
logger.error(err);
});
wss.on('listening', () => {
logger.info('websocket server listening');
});
setInterval(_detectAlive, 3000);
return Promise.resolve({});
function _detectAlive() {
wss.clients.forEach(item => {
let unresponsive = id2UnresponsivePing[item.id] || 0;
if (unresponsive >= 5) {
item.isAlive = false;
item.close(1001, 'CLOSE_GOING_AWAY');
return;
}
try {
item.send(`ping/${Date.now()}`, () => {
id2UnresponsivePing[item.id] = unresponsive + 1;
});
}
catch (e) {
logger.error(e);
}
});
}
}
function connectUrl(notify) {
let id = uuid.v4();
id2Socket[id] = null;
id2Notify[id] = notify; // 暂时借助框架内http rpc完成通信, 后续观察性能和可靠性再单独优化通信方式
return Promise.resolve({
node: C.NODE,
id: id,
url: `${C.CONNECT_PREFIX}/${C.NODE}?id=${id}`
});
}
function closeById(ids, code, reason) {
if (!_.isArray(ids)) {
ids = [ids];
}
_.each(ids, item => {
let socket = id2Socket[item];
if (!_.isEmpty(socket)) {
socket.close(code, reason);
}
});
return Promise.resolve({});
}
function sendMessageById(ids, msg) {
if (!_.isArray(ids)) {
ids = [ids];
}
return Promise.map(ids, item => {
let socket = id2Socket[item];
if (_.isEmpty(socket)) {
logger.warn('send message no socket', item);
return item;
}
if (socket.readyState !== ws.OPEN) {
logger.warn('send message socket state is not open', item, socket.readyState);
return item;
}
if (!socket.isAlive) {
logger.wran('send message socket not alive', item);
return item;
}
return Promise.promisify(socket.send, {
context: socket
})(msg).then(() => {
logger.info(`[${item}] send`, msg);
return null;
}).catch(err => {
logger.error('send message error');
logger.error(err);
return item;
});
}).then(ret => _.filter(ret, item => !_.isNull(item))); // return failed ids
}
function notifyDownstream(id, type, msg) {
let notify = id2Notify[id];
if (_.isEmpty(notify)) {
logger.warn('no notify info', id);
return Promise.resolve({});
}
return fw.serviceCall(notify.module, notify.service, id, type, msg).then(() => {
logger.info(`[${id}] notify`, type, msg);
}).catch(err => {
logger.error('notify error', id);
logger.error(err);
closeById(id, 1001, 'downstream error');
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.