Skip to content

Instantly share code, notes, and snippets.

@artemis15
Forked from yekver/prom-client_pm2_cluster.js
Created August 27, 2021 11:48
Show Gist options
  • Save artemis15/0d1c022d356ccafdae33e6db9865e08a to your computer and use it in GitHub Desktop.
Save artemis15/0d1c022d356ccafdae33e6db9865e08a to your computer and use it in GitHub Desktop.
Instead of `cluster` module there is no direct access to the master process in `pm2`. To return metrics for the whole cluster you can do IPC calls from the active instance to the rest of them and wait while all their locally collected metrics will be sent. Finally you have to aggregate all received metrics.
const prom = require('prom-client');
const pm2 = require('pm2');
let pm2Bus;
const REQ_TOPIC = 'get_prom_register';
function pm2exec(cmd, ...args) {
return new Promise((resolve, reject) => {
pm2[cmd](...args, (err, resp) => (err ? reject(err) : resolve(resp)));
});
}
function getOnlineInstances(instancesData) {
return instancesData.filter(({ pm2_env }) => pm2_env.status === 'online');
}
function getMainMetricsRegister(instancesData) {
// don't use prom.register here because these metrics
// will be summed in cluster mode!
const registry = new prom.Registry();
const mainMetrics = [
{ name: 'up', help: 'Is the process running' },
{ name: 'cpu', help: 'Process cpu usage' },
{ name: 'memory', help: 'Process memory usage' },
{ name: 'heap_size', help: 'Process heap size' },
{ name: 'used_heap_size', help: 'Process heap usage' },
{ name: 'uptime', help: 'Process uptime' },
{ name: 'instances', help: 'Process instances' },
{ name: 'restarts', help: 'Process restarts' },
{ name: 'loop_delay', help: 'Event Loop Latency' },
{ name: 'loop_delay_p95', help: 'Event Loop Latency p95' },
].reduce((acc, { name, help }) => {
acc[name] = new prom.Gauge({
name,
help,
labelNames: ['name', 'instance'],
registers: [registry],
});
return acc;
}, {});
instancesData.forEach(({ name, pm2_env, monit }) => {
const conf = {
name: name,
instance: pm2_env.pm_id,
};
const axm = pm2_env.axm_monitor;
const values = {
up: pm2_env.status === 'online' ? 1 : 0,
cpu: monit.cpu,
memory: monit.memory,
heap_size: parseFloat(axm['Heap Size'].value) || null,
used_heap_size: parseFloat(axm['Used Heap Size'].value) || null,
uptime: Math.round((Date.now() - pm2_env.pm_uptime) / 1000),
instances: pm2_env.instances || 1,
restarts: pm2_env.unstable_restarts + pm2_env.restart_time,
loop_delay: parseFloat(axm['Event Loop Latency'].value) || null,
loop_delay_p95: parseFloat(axm['Event Loop Latency p95'].value) || null,
};
Object.entries(values).forEach(([name, value]) => {
if (value !== null) {
mainMetrics[name].set(conf, value);
}
});
});
return registry;
}
function requestNeighboursData(instancesData, instancesToWait) {
const targetInstanceId = Number(process.env.pm_id);
const data = { topic: REQ_TOPIC, data: { targetInstanceId } };
Object.values(instancesData).forEach(({ pm_id }) => {
if (pm_id !== targetInstanceId) {
pm2exec('sendDataToProcessId', pm_id, data).catch(e => {
instancesToWait.count--;
console.error(`Failed to request metrics from instance #${pm_id}: ${e.message}`);
});
}
});
}
function getCurrentRegistry(instancesData) {
return prom.AggregatorRegistry.aggregate([
getMainMetricsRegister(instancesData).getMetricsAsJSON(),
prom.register.getMetricsAsJSON(),
]);
}
async function getAggregatedRegistry(instancesData) {
const onlineInstances = getOnlineInstances(instancesData);
let instancesToWait = { count: onlineInstances.length };
const registryPromise = new Promise(async (resolve, reject) => {
const registersList = [];
const instanceId = Number(process.env.pm_id);
const eventName = `process:${instanceId}`;
let responcesCount = 1;
let timeoutId;
function sendResult() {
pm2Bus.off(eventName);
resolve(prom.AggregatorRegistry.aggregate(registersList));
}
function kickNoResponseTimeout() {
timeoutId = setTimeout(() => {
console.warn(
`Metrics were sent by timeout. No response from ${instancesToWait.count -
responcesCount} instances.`,
);
sendResult();
}, 1000);
}
try {
registersList[instanceId] = getCurrentRegistry(
instancesData,
).getMetricsAsJSON();
if (!pm2Bus) {
pm2Bus = await pm2exec('launchBus');
}
kickNoResponseTimeout();
pm2Bus.on(eventName, packet => {
registersList[packet.data.instanceId] = packet.data.register;
responcesCount++;
clearTimeout(timeoutId);
if (responcesCount === instancesToWait.count) {
sendResult();
} else {
kickNoResponseTimeout();
}
});
} catch (e) {
reject(e);
}
});
// this function must be called after the registryPromise declaration
// because requests have to be sent after the listener was setup.
requestNeighboursData(onlineInstances, instancesToWait);
return registryPromise;
}
// Listener
process.on('message', packet => {
if (packet.topic === REQ_TOPIC) {
process.send({
type: `process:${packet.data.targetInstanceId}`,
data: {
instanceId: Number(process.env.pm_id),
register: prom.register.getMetricsAsJSON(),
},
});
}
});
(async () => await pm2exec('connect'))();
module.exports = async (req, res) => {
let responceData;
try {
const instancesData = await pm2exec('list');
const register =
getOnlineInstances(instancesData).length > 1
? await getAggregatedRegistry(instancesData)
: getCurrentRegistry(instancesData);
responceData = register.metrics();
} catch (err) {
console.error(`Failed to get metrics: ${err.message}`);
} finally {
res.set('Content-Type', prom.register.contentType);
res.end(responceData);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment