Skip to content

Instantly share code, notes, and snippets.

@saqibarfeen
Created January 26, 2022 13:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saqibarfeen/8bc729ade1c1b17fad764d924e64af32 to your computer and use it in GitHub Desktop.
Save saqibarfeen/8bc729ade1c1b17fad764d924e64af32 to your computer and use it in GitHub Desktop.
import pm2Cb from "pm2";
import bluebird from "bluebird";
import express from "express";
// /* if you use Bluebird, it makes using PM2 API easier, creating *Async functions */
const pm2 = bluebird.promisifyAll(pm2Cb) as unknown as any;
import * as promClient from "prom-client";
import { v4 as uuidv4 } from "uuid";
/** Total timeout for workers, ms */
const TIMEOUT = 2000;
/** The global message topic for gathering Prometheus metrics */
const TOPIC = "get_prom_register";
/** Singleton instance of PM2 message bus */
let pm2Bus;
const instanceId = Number(process.env.pm_id);
/* Info returned by pm2.list() */
interface PM2InstanceData {
pm_id: number;
}
/** Response packet sent to the master instance */
interface ResponsePacket {
type: string;
data: {
instanceId: number;
register: any;
success: boolean;
reqId: string;
};
}
/** IPC request packet sent from the master instance to the others */
interface RequestPacket {
topic: "get_prom_register";
data: {
/** ID if the master instance */
targetInstanceId: number;
/** Unique request ID to prevent collisions from multiple requests */
reqId: string;
};
}
/** Every process listens on the IPC channel for the metric request TOPIC,
responding with Prometheus metrics when needed. */
process.on("message", async (packet: RequestPacket) => {
try {
if (packet.topic === TOPIC) {
process.send({
type: `process:${packet.data.targetInstanceId}`,
data: {
instanceId,
register: await promClient.register.getMetricsAsJSON(),
success: true,
reqId: packet.data.reqId
}
} as ResponsePacket);
}
} catch (e) {
console.error("Error sending metrics to master node", e);
}
});
async function requestNeighboursData(instancesData: PM2InstanceData[], reqId: string) {
const requestData: RequestPacket = {
topic: TOPIC,
data: {
targetInstanceId: instanceId,
reqId
}
};
const promises = [];
for (const instanceData of Object.values(instancesData)) {
const targetId = instanceData.pm_id;
// don't send message to self
if (targetId !== instanceId) {
promises.push(
pm2
.sendDataToProcessIdAsync(targetId, requestData)
.catch(e => console.error(e))
);
}
}
// Resolves when all responses have been received
return bluebird.all(promises);
}
/** Master process gathering aggregated metrics data */
async function getAggregatedRegistry(instancesData: PM2InstanceData[]) {
if (!instancesData || !instancesData.length) {
return;
}
// assigning a unique request ID
const reqId = uuidv4();
const registryPromise = new bluebird<any>(async (resolve, reject) => {
const instancesCount = instancesData.length;
const registersPerInstance = [];
const busEventName = `process:${instanceId}`;
// master process metrics
registersPerInstance[instanceId] = await promClient.register.getMetricsAsJSON();
let registersReady = 1;
const finish = () => {
// deregister event listener to prevent memory leak
pm2Bus.off(busEventName);
resolve(promClient.AggregatorRegistry.aggregate(registersPerInstance));
};
// we can only resolve/reject this promise once
// this safety timeout deregisters the listener in case of an issue
const timeout = setTimeout(finish, TIMEOUT);
/** Listens to slave instances' responses */
pm2Bus.on(busEventName, (packet: ResponsePacket) => {
try {
if (packet.data.reqId === reqId) {
// array fills up in order of responses
registersPerInstance[packet.data.instanceId] = packet.data.register;
registersReady++;
if (registersReady === instancesCount) {
// resolve when all responses have been received
clearTimeout(timeout);
finish();
}
}
} catch (e) {
reject(e);
}
});
});
// request instance data after the response listener has been set up
// we are not awaiting, resolution is handled by the bus event listener
requestNeighboursData(instancesData, reqId);
return registryPromise;
}
/** Main /metrics handler function */
export default async function clusterMetrics(
req: express.Request,
res: express.Response,
next: express.NextFunction
) {
try {
res.setHeader("Content-Type", promClient.register.contentType);
// get current instances (threads) data
const instancesData = await pm2.listAsync();
if (instancesData.length > 1) {
// create or use bus singleton
pm2Bus = pm2Bus || (await pm2.launchBusAsync());
// multiple threads - aggregate
const register = await getAggregatedRegistry(instancesData);
res.end(await register.metrics());
} else {
// 1 thread - send local stats
res.end(await promClient.register.metrics());
}
} catch (e) {
throw new Error(e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment