Skip to content

Instantly share code, notes, and snippets.

@intech
Last active December 1, 2021 16:53
Show Gist options
  • Save intech/e5ea80f2750809de12c53032188be2d4 to your computer and use it in GitHub Desktop.
Save intech/e5ea80f2750809de12c53032188be2d4 to your computer and use it in GitHub Desktop.
Moleculer Socket.IO mixin with auto-alias
module.exports = {
name: "test",
version: 1,
actions: {
test: {
ws: {
name: "test"
},
async handler(ctx) {
const { user } = ctx.meta;
// on client emit event msg with payload { msg }
ctx.broadcast("io.msg", { msg: "event from server" }, {
meta: { rooms: [user] }
}.catch(err => this.logger.error(err));
return user;
}
}
}
};
const _ = require("lodash");
const { Server } = require("socket.io");
const { ServiceNotFoundError, MoleculerClientError } = require("moleculer").Errors;
const { UnAuthorizedError, ERR_INVALID_TOKEN, ERR_NO_TOKEN } = require("moleculer-web").Errors;
module.exports = {
name: "ws",
settings: {
$shutdownTimeout: 5000,
debounceTime: 500,
io: {
//socket.io options
options: {
path: "/ws",
pingInterval: 1000,
pingTimeout: 5000,
connectTimeout: 5000,
serveClient: false,
cookie: false
// transports: ["websocket"]
}
}
},
events: {
async "$services.changed"() {
await this.regenerateEventsAuto();
},
"io.latency": {
ws: {
name: "latency"
},
group: "io",
async handler() {
return Promise.resolve();
}
},
"io.join": {
ws: {
name: "join"
},
group: "io",
async handler(ctx) {
const { params, meta } = ctx;
this.logger.debug("join:", { meta }, params);
this.io.to(meta.socket).socketsJoin(params);
}
},
"io.leave": {
ws: {
name: "leave"
},
group: "io",
async handler(ctx) {
const { params, meta } = ctx;
this.logger.debug("leave:", { meta }, params);
this.io.to(meta.socket).socketsLeave(params);
}
},
"io.**": {
group: "io",
async handler(ctx) {
const { eventName, params, meta } = ctx;
if (meta.from === "client") return;
const event = eventName.substr(3);
this.logger.debug("[IO] emit: ", eventName, event, params, meta);
let namespace = this.io;
if (meta.namespace) namespace = namespace.of(meta.namespace);
// that the event data may be lost if the client is not ready to receive messages
if (meta.volate) namespace = namespace.volate;
// the event data will only be broadcast to every sockets but the sender
if (meta.broadcast) namespace = namespace.broadcast;
if (meta.rooms) {
for (let room of meta.rooms) {
const groups = room.split(":");
if (groups.length) {
for (let i = 1; i <= groups.length; i++) {
const wildcard = groups.slice(0, i).join(":");
this.logger.debug(`emit wildcard ${i} groups:`, wildcard);
namespace = namespace.to(wildcard);
}
}
}
}
if (params) {
namespace.emit(event, params);
} else {
namespace.emit(event);
}
}
}
},
methods: {
/**
* Init Socket.IO
* @param opts
*/
initSocketIO(opts = {}) {
opts = {
...(this.settings.io && this.settings.io.options ? this.settings.io.options : {}),
cors: this.settings.cors || "*"
};
this.io = new Server(this.server, opts);
},
/**
* Get meta data from socket
* @param socket
* @return {*&{$rooms: string[]}}
*/
socketGetMeta(socket) {
let data = {
...socket.data,
$rooms: Object.keys(socket.rooms)
};
this.logger.debug("getMeta", data);
return data;
},
/**
* Set meta data in socket
* @param socket
* @param data
* @return {*|(*)}
*/
socketSaveMeta(socket, data) {
socket.data = {
...socket.data,
...data
};
this.logger.debug("setMeta", socket.data);
return socket.data;
},
/**
* Formatter error
* @param err
* @param respond
* @return {*}
*/
socketOnError(err, respond) {
const errObj = _.pick(err, ["name", "message", "code", "type", "data"]);
return respond(errObj);
},
/**
* Get handler action by event name
* @param namespace
* @param eventName
* @param args
* @return {Promise<unknown>}
*/
async getEventHandler(namespace, eventName, ...args) {
this.logger.debug("getEventHandler:", namespace, eventName, args);
// Check endpoint visibility
const handler = this.handlers.get(`${namespace}${eventName}`);
if (!handler) {
// Action can't be published
throw new ServiceNotFoundError({ eventName });
}
this.logger.debug("getEventHandler action:", handler);
return handler;
},
/**
* Registration all events with ws in registry
*/
async registrationEvents() {
const events = this.broker.registry.events.list({
onlyLocal: false,
onlyAvailable: true,
skipInternal: true,
withEndpoints: false
});
for (const { name, group, event } of events) {
const fullName = [group, name].join(".");
if ("ws" in event) {
const { namespace = "/", name = fullName } = event.ws;
this.logger.debug("Add route from event:", namespace, name, event.ws);
// register if not exists namespace
if (!this.namespaces.has(namespace)) this.namespaces.add(namespace);
// register if not exists action in namespace
if (!this.handlers.has(`${namespace}${name}`)) {
this.handlers.set(`${namespace}${name}`, {
type: "event",
name: event.name
});
}
}
}
},
/**
* Registration all actions with ws in registry
*/
async registrationActions() {
const actions = this.broker.registry.actions.list({
onlyLocal: false,
onlyAvailable: true,
skipInternal: true,
withEndpoints: false
});
for (const { name: fullName, action } of actions) {
if ("ws" in action) {
const { namespace = "/", name = fullName } = action.ws;
this.logger.debug("Add route:", fullName, namespace, name, action.ws);
// register if not exists namespace
if (!this.namespaces.has(namespace)) this.namespaces.add(namespace);
// register if not exists action in namespace
if (!this.handlers.has(`${namespace}${name}`)) {
this.handlers.set(`${namespace}${name}`, {
type: "action",
name: fullName
});
}
}
}
}
},
created() {
this.namespaces = new Set();
this.handlers = new Map();
},
async started() {
if (!this.io) this.initSocketIO();
this.io.engine.on("connection_error", err => {
console.log(err);
});
const debounceTime = Number(this.settings.debounceTime) || 500;
this.regenerateEventsAuto = _.debounce(
() => Promise.all([this.registrationEvents(), this.registrationActions()]),
debounceTime
);
this.io.use(async (socket, next) => {
if ("auth" in socket.handshake && "token" in socket.handshake.auth) {
try {
const { auth, headers, url } = socket.handshake;
this.logger.debug(`Socket ${socket.id} auth check:`, {
token: auth.token,
path: `${headers["host"]}${url}`
});
const uuid = await this.broker.call("v1.auth.check", {
token: auth.token,
path: `${headers["host"]}${url}`
});
this.logger.debug({ uuid });
socket.join(uuid);
this.socketSaveMeta(socket, { socket: socket.id, user: uuid });
return next();
} catch (e) {
this.logger.error(new UnAuthorizedError(ERR_INVALID_TOKEN, e));
return next(new UnAuthorizedError(ERR_INVALID_TOKEN, e));
}
} else {
// return next();
this.logger.error(new UnAuthorizedError(ERR_NO_TOKEN));
return next(new UnAuthorizedError(ERR_NO_TOKEN));
}
});
this.io.on("connection", socket => {
this.logger.debug("Client connected:", socket.id, socket.nsp.name);
socket.on("error", err => {
this.logger.error("[IO]:", err.toString());
if (err && err.code === 401) socket.disconnect();
});
socket.onAny(async (eventName, ...args) => {
const opts = {
meta: {
// socket: socket.id,
...this.socketGetMeta(socket),
from: "client"
}
};
let cb,
params = {};
if (Array.isArray(args)) {
// TODO: add format validation:
// io.emit("event.name", { params }, callback(response) => {})
// io.emit("event.name", callback(response) => {})
// io.emit("event.name", { params })
// io.emit("event.name")
this.logger.debug("args is array", args.length);
if (typeof args[args.length - 1] === "function") {
cb = args.pop();
this.logger.debug("args with callback", cb);
}
params = args.shift() || {};
this.logger.debug("with params", params);
}
try {
this.logger.debug(socket.nsp.name, eventName, ...args);
const { type, name } = await this.getEventHandler(
socket.nsp.name,
eventName,
...args
);
this.logger.debug({ type, name });
switch (type) {
case "action": {
// io.emit("event.name", { params }, callback(response) => {})
// io.emit("event.name", callback(response) => {})
const endpoint = this.broker.findNextActionEndpoint(name, opts);
if (endpoint instanceof Error) {
if (cb) cb(endpoint);
this.logger.error(endpoint);
}
try {
this.logger.debug("Call action:", name, params, opts);
const result = await this.broker.call(name, params, opts);
if (cb) cb(result);
else
this.logger.warn(
`Event received '${eventName}' without callback function`
);
} catch (e) {
if (cb) cb(e);
this.logger.error(e);
}
break;
}
case "event": {
// io.emit("event.name", { params })
// io.emit("event.name")
try {
this.logger.debug("Call event:", name, params, opts);
await this.broker.emit(name, params, opts);
} catch (e) {
if (cb) cb(e);
this.logger.error(e);
}
break;
}
default: {
const err = new MoleculerClientError(
"Unknown type event",
401,
"ERR_WS_TYPE_EVENT"
);
if (cb) cb(err);
this.logger.error(err);
}
}
} catch (e) {
this.logger.error(e);
if (cb) cb(e);
}
});
});
this.logger.info("Socket.io API Gateway started.");
},
async stopped() {
this.logger.info("Socket.io API Gateway stopped.");
if (this.io) {
return this.io.disconnectSockets(true);
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment