Skip to content

Instantly share code, notes, and snippets.

@Jerska
Last active November 7, 2017 17:33
Show Gist options
  • Save Jerska/772aeb93345dae2cd45142686eb4212c to your computer and use it in GitHub Desktop.
Save Jerska/772aeb93345dae2cd45142686eb4212c to your computer and use it in GitHub Desktop.
Custom librato client with Node.js

Goals

The goal here was to solve our issues with Librato and Node.js only on a distributed system on Heroku. Here's the list of our issues / questions :

  • we loved how easy the log drain was to use
  • but we sent too much data to use the log drain w/ Heroku so some of it was skipped and the data was inconsistent
  • we had multiple machines updating the same counters (e.g. # of concurrent jobs) and we wanted a rate (concurrent jobs / s accross all machines) : you can't do using counters
  • librato-node uses counters internally when you call increment
  • using librato-node in dev environment requires you to wrap it
  • making a request to the API every 15s would induce performance issues
  • [Bonus] Configuring in the interface every single metric was painful

I've spent a good deal of time on live-chat with Greg of Librato. He's been incredibly helpful wrapping my head around how SSA worker and how to make our use-case work. :)

So I ended up with this small piece of code to try to fix all of those. It's obviously opinionated and shouldn't be seen as a lib, more as a reusable block that you should modify depending on your needs.

Explanation

Every single gauge will have SSA (Service-Side Aggregation) enabled, allowing you to have multiple machines updating the same metric.

  • When you want an average (timing, load of machines, ...), add the metric to your graph and select average of averages
  • When you want a rate (counter / s), add the metric to your graph, select average of sums and add x / p as a formula to get the rate per second

Features

  • .count(name, value[, unit]) which is a counter

  • .measure(name, value[, unit]) which is a normal gauge

  • period can be different smaller than the uploading period (by a ratio). For instance, we're using a period of 15s and send the data every 2 minutes.

  • dev friendly: will simply log if the ENV variables aren't set

  • Extra points: this format is also compatible with the Heroku log drain - if at any point you have an issue, just unset LIBRATO_EMAIL from your env and the logs should take over

  • automatic configuration of your metrics every time the system is about to push a never seen one (since the process start):

    • it will compute a pretty name from your metric name
    • will set units if you've used them on your calls to .count or .measure
    • will set the period of the metric to the period in the file : change it once, let it propagate everywhere
    • always set display_min to 0 since it's rarely relevant to see only fluctuations

Warnings / Not supported

This works with the sources version of Librato, and is untested with the tags one. When on Heroku, you are necessarily using the source version. If you're unsure, see here: https://www.librato.com/docs/kb/faq/account_questions/tags_or_sources/ .

The automatic setup of the metric will be launched by each process that pushes this metric for the first time. The convenience was too good for us to avoid running N API calls once every time the processes are restarted, but that's something you should be aware of.

We're not handling min / max. This can easily be added.

However you shouldn't add sum and count for counters, or you will break the counting accross all machines functionality. Indeed, when you select average of sums, Librato's SSA will try to take the sum (across all the machines) of the sums (of each machine). This is why this works: each machine sends sum == average, ending up creating a average of sum of averages, or more simply a sum (across machines) of averages == counts (on each machine).

// Most options are not needed here, this is just a wrapper we use for other things in our project
const request = require('request-promise-native');
module.exports = async function fetch({ url, requestOptions = {} }) {
const defaultRequestOptions = {
headers: {
'User-Agent': `My project`,
},
// would otherwise resolve with only the body
resolveWithFullResponse: true,
gzip: true,
// Only raise errors when failed for technical reasons,
// see https://www.npmjs.com/package/request-promise#api-in-detail
simple: false,
forever: true,
};
const { statusCode, body, headers } = await request(url, {
...defaultRequestOptions,
...requestOptions,
});
return {
statusCode,
body,
headers
};
};
const fetch = require('./fetch');
function createClient() {
const PERIOD = 15000;
const PUSH_PERIOD_RATIO = 8; // Actually send the measurements only once / min
const PUSH_TIMEOUT = 10000;
const ENDPOINT = 'https://metrics-api.librato.com/v1';
const email = process.env.LIBRATO_EMAIL;
const token = process.env.LIBRATO_TOKEN;
const source = process.env.WORKER_NAME || 'my-project';
let batch = [];
let calls = {};
let measures = {};
let funcs = {};
let units = {};
const metricsSeen = [];
const requestOptions = {
headers: {
Authorization: `Basic ${new Buffer(`${email}:${token}`).toString(
'base64'
)}`,
'User-Agent': `Algolia Custom Librato`,
},
timeout: PUSH_TIMEOUT,
};
/* Worker */
function getStartTime(period) {
const now = Date.now();
return now + (period - now % period);
}
let timerId = null;
function start(period = PERIOD) {
let nextRun = getStartTime(period);
let nextSave = getStartTime(period * PUSH_PERIOD_RATIO);
const workFn = () => {
for (;;) {
const now = Date.now();
if (now >= nextRun) {
flush(nextRun, period, now >= nextSave);
while (nextRun <= now) {
nextRun += period;
}
while (nextSave <= now) {
nextSave += period * PUSH_PERIOD_RATIO;
}
} else {
return (timerId = setTimeout(workFn, nextRun - now));
}
}
};
workFn();
}
function stop() {
clearTimeout(timerId);
}
/* API */
function count(name, value, unit = null) {
calls[name] = 'count';
funcs[name] = 'sum';
units[name] = unit;
if (!measures[name]) measures[name] = [];
measures[name].push(value);
}
function measure(name, value, unit = null) {
calls[name] = 'measure';
funcs[name] = 'avg';
units[name] = unit;
if (!measures[name]) measures[name] = [];
measures[name].push(value);
}
/* Flushing */
function getValue(name) {
const call = calls[name];
const vals = measures[name];
const func = funcs[name];
if (vals.length === 0) return null;
const sum = vals.reduce((a, b) => a + b, 0);
const len = vals.length;
if (func === 'sum') return sum;
if (func === 'avg') return len > 0 ? sum / len : 0;
throw new Error(
`[Librato] Invalid aggregator given to librato.${call}: "${func}"`
);
}
function print(name, val) {
const call = calls[name];
console.log(`${call}#${name}=${val}`);
}
async function updateMeasurements(now, values, shouldPush) {
const measureTime = Math.floor(now / 1000);
const currentGauges = Object.keys(values).map(name => ({
name,
value: values[name],
measure_time: measureTime, // eslint-disable-line camelcase
}));
batch = batch.concat(currentGauges);
if (!shouldPush) return;
const json = {
source,
gauges: batch,
};
batch = [];
const url = `${ENDPOINT}/metrics`;
await fetch({
url,
requestOptions: {
...requestOptions,
method: 'POST',
json,
},
});
}
function getDisplayName(name) {
const unprefixed = name.replace(/^[^.]*\./, '');
const spaced = unprefixed.replace(/[-._]/g, ' ');
return spaced.charAt(0).toUpperCase() + spaced.slice(1);
}
function updateMetrics(names, period) {
return Promise.all(
names.map(async name => {
if (metricsSeen.indexOf(name) !== -1) return;
metricsSeen.push(name);
const unit = units[name];
const metric = {
type: 'gauge',
name,
display_name: getDisplayName(name), // eslint-disable-line camelcase
period: Math.floor(period / 1000),
attributes: {
display_min: 0, // eslint-disable-line camelcase
aggregate: true,
display_units_short: unit, // eslint-disable-line camelcase
display_units_long: unit, // eslint-disable-line camelcase
},
};
const url = `${ENDPOINT}/metrics/${metric.name}`;
await fetch({
url,
requestOptions: {
...requestOptions,
method: 'PUT',
json: metric,
},
});
})
);
}
async function flush(now, period, shouldPush) {
const values = Object.keys(measures).reduce((res, name) => {
const val = getValue(name);
if (val === null) return res;
res[name] = val; // eslint-disable-line no-param-reassign
return res;
}, {});
if (email && token) {
await Promise.all([
updateMeasurements(now, values, shouldPush),
updateMetrics(Object.keys(values), period),
]);
} else {
Object.keys(values).forEach(name => {
print(name, values[name]);
});
}
calls = {};
measures = {};
funcs = {};
units = {};
}
return {
start,
stop,
count,
measure,
flush,
};
}
const client = createClient();
client.start();
module.exports = client;
@vvo
Copy link

vvo commented Oct 24, 2017

If your library is better than the current one, why not just open-source it with real tests? I can help you with that ( ~1day). That would benefit a lot the community and librato users, along with strengthening the module.

Not high priority but still, interesting

@Jerska
Copy link
Author

Jerska commented Nov 7, 2017

Missed this comment!
I'm just worried about feature requests on this @vvo .

This is open-source, so should be accessible too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment