Created
January 26, 2022 13:38
-
-
Save saqibarfeen/8bc729ade1c1b17fad764d924e64af32 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pm2Cb from "pm2"; | |
import bluebird from "bluebird"; | |
import express from "express"; | |
// /* if you use Bluebird, it makes using PM2 API easier, creating *Async functions */ | |
const pm2 = bluebird.promisifyAll(pm2Cb) as unknown as any; | |
import * as promClient from "prom-client"; | |
import { v4 as uuidv4 } from "uuid"; | |
/** Total timeout for workers, ms */ | |
const TIMEOUT = 2000; | |
/** The global message topic for gathering Prometheus metrics */ | |
const TOPIC = "get_prom_register"; | |
/** Singleton instance of PM2 message bus */ | |
let pm2Bus; | |
const instanceId = Number(process.env.pm_id); | |
/* Info returned by pm2.list() */ | |
interface PM2InstanceData { | |
pm_id: number; | |
} | |
/** Response packet sent to the master instance */ | |
interface ResponsePacket { | |
type: string; | |
data: { | |
instanceId: number; | |
register: any; | |
success: boolean; | |
reqId: string; | |
}; | |
} | |
/** IPC request packet sent from the master instance to the others */ | |
interface RequestPacket { | |
topic: "get_prom_register"; | |
data: { | |
/** ID if the master instance */ | |
targetInstanceId: number; | |
/** Unique request ID to prevent collisions from multiple requests */ | |
reqId: string; | |
}; | |
} | |
/** Every process listens on the IPC channel for the metric request TOPIC, | |
responding with Prometheus metrics when needed. */ | |
process.on("message", async (packet: RequestPacket) => { | |
try { | |
if (packet.topic === TOPIC) { | |
process.send({ | |
type: `process:${packet.data.targetInstanceId}`, | |
data: { | |
instanceId, | |
register: await promClient.register.getMetricsAsJSON(), | |
success: true, | |
reqId: packet.data.reqId | |
} | |
} as ResponsePacket); | |
} | |
} catch (e) { | |
console.error("Error sending metrics to master node", e); | |
} | |
}); | |
async function requestNeighboursData(instancesData: PM2InstanceData[], reqId: string) { | |
const requestData: RequestPacket = { | |
topic: TOPIC, | |
data: { | |
targetInstanceId: instanceId, | |
reqId | |
} | |
}; | |
const promises = []; | |
for (const instanceData of Object.values(instancesData)) { | |
const targetId = instanceData.pm_id; | |
// don't send message to self | |
if (targetId !== instanceId) { | |
promises.push( | |
pm2 | |
.sendDataToProcessIdAsync(targetId, requestData) | |
.catch(e => console.error(e)) | |
); | |
} | |
} | |
// Resolves when all responses have been received | |
return bluebird.all(promises); | |
} | |
/** Master process gathering aggregated metrics data */ | |
async function getAggregatedRegistry(instancesData: PM2InstanceData[]) { | |
if (!instancesData || !instancesData.length) { | |
return; | |
} | |
// assigning a unique request ID | |
const reqId = uuidv4(); | |
const registryPromise = new bluebird<any>(async (resolve, reject) => { | |
const instancesCount = instancesData.length; | |
const registersPerInstance = []; | |
const busEventName = `process:${instanceId}`; | |
// master process metrics | |
registersPerInstance[instanceId] = await promClient.register.getMetricsAsJSON(); | |
let registersReady = 1; | |
const finish = () => { | |
// deregister event listener to prevent memory leak | |
pm2Bus.off(busEventName); | |
resolve(promClient.AggregatorRegistry.aggregate(registersPerInstance)); | |
}; | |
// we can only resolve/reject this promise once | |
// this safety timeout deregisters the listener in case of an issue | |
const timeout = setTimeout(finish, TIMEOUT); | |
/** Listens to slave instances' responses */ | |
pm2Bus.on(busEventName, (packet: ResponsePacket) => { | |
try { | |
if (packet.data.reqId === reqId) { | |
// array fills up in order of responses | |
registersPerInstance[packet.data.instanceId] = packet.data.register; | |
registersReady++; | |
if (registersReady === instancesCount) { | |
// resolve when all responses have been received | |
clearTimeout(timeout); | |
finish(); | |
} | |
} | |
} catch (e) { | |
reject(e); | |
} | |
}); | |
}); | |
// request instance data after the response listener has been set up | |
// we are not awaiting, resolution is handled by the bus event listener | |
requestNeighboursData(instancesData, reqId); | |
return registryPromise; | |
} | |
/** Main /metrics handler function */ | |
export default async function clusterMetrics( | |
req: express.Request, | |
res: express.Response, | |
next: express.NextFunction | |
) { | |
try { | |
res.setHeader("Content-Type", promClient.register.contentType); | |
// get current instances (threads) data | |
const instancesData = await pm2.listAsync(); | |
if (instancesData.length > 1) { | |
// create or use bus singleton | |
pm2Bus = pm2Bus || (await pm2.launchBusAsync()); | |
// multiple threads - aggregate | |
const register = await getAggregatedRegistry(instancesData); | |
res.end(await register.metrics()); | |
} else { | |
// 1 thread - send local stats | |
res.end(await promClient.register.metrics()); | |
} | |
} catch (e) { | |
throw new Error(e); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment