Skip to content

Instantly share code, notes, and snippets.

@codemoran
Created September 9, 2013 06:24
Show Gist options
  • Save codemoran/6492102 to your computer and use it in GitHub Desktop.
Save codemoran/6492102 to your computer and use it in GitHub Desktop.
Graphdat Metrics collector that listens over UDP, batches up the messages using the Graphdat batch Metrics API. http://developer.graphdat.com/v1/post/measurements
var _dgram = require('dgram');
var _https = require('https');
// How often does the UDP server flush the message queue and send to Graphdat
// To keep the graphs flowing in real time, flush every 3s
var FLUSH_TIME = 3000;
// What port should the server start up on
var SERVER_PORT = 8900;
// Configuration options for contacting Graphdat
// Email is what you use to sign in with, API Token is from Account -> Settings
var CONFIG_EMAIL = '[YOUR_EMAIL]';
var CONFIG_APITOKEN = '[YOUR_API_TOKEN]';
// What output would you like to see displayed, helpful for debugging issues
var DEBUG_LOG_INVALID_MESSAGES = true;
var DEBUG_LOG_VALID_MESSAGES = false;
var DEBUG_LOG_SENT_MESSAGE_QUEUE = false;
var DEBUG_LOG_STATUS = true;
// Our queue
var hash = {};
var messageQueue = [];
// Our logger
var logger = console;
// Keep track of what is happening for reporting
var invalidMessages = 0;
var sentMessages = 0;
var validMessages = 0;
function formatDate(dateToFormat) { return dateToFormat.toISOString().replace(/T/, ' ').replace(/\..+/, ''); }
function getTime() { return Math.floor(Date.now()/1000); }
/* ===
A Graphdat message has the following format as an object
{
"source": "myserver",
"metric": "PAGE_FAULTS",
"measure": 2,
"timestamp": 1377043134
}
Or the following format as an array
[
"myserver",
"PAGE_FAULTS",
2,
1377043134
]
The Timestamp in both examples is optional, but is a good idea
as items will graph where you expect them that way.
Timestamps are in UNIX time format, so Date.now() / 1000
=== */
function validate(message, cb)
{
var obj;
if (Array.isArray(message))
{
obj = {
source: message[0],
metric: message[1],
measure: message[2],
timestamp: message[3]
};
}
else if (message !== null && typeof message === 'object')
{
obj = message;
}
else
{
return cb('message is not in the correct format');
}
if (!('source' in obj))
return cb('message is missing the `source` property');
if (!('metric' in obj))
return cb('message is missing the `metric` property');
if (!('measure' in obj))
return cb('message is missing the `measure` property');
if (!('timestamp' in obj))
obj.timestamp = getTime();
// using JS time and not UNIX time
if (obj.timestamp > 2000000000)
obj.timestamp = obj.timestamp/1000;
// floor the seconds as we have per second metrics
obj.timestamp = Math.floor(obj.timestamp);
return cb(null, obj);
}
/* ===
Graphdat provides 1 second intervals, so if you are sending data at sub second intervals,
you are paying for data that you cannot see. If we get two values in the same second,
take the GREATER of the two values.
We have a bit of leakage on the intervals, but it saves from sending 750 metrics/s to
sending only 1 or 2 at the same time period
=== */
function addMessageToQueue(message)
{
hash = hash || {};
if (!hash[message.timestamp])
hash[message.timestamp] = {};
if (!hash[message.timestamp][message.metric])
hash[message.timestamp][message.metric] = {};
if (!hash[message.timestamp][message.metric][message.source])
{
var obj = [message.source, message.metric, message.measure, message.timestamp];
hash[message.timestamp][message.metric][message.source] = obj;
messageQueue.push(obj);
}
else
{
var existing = hash[message.timestamp][message.metric][message.source];
if (existing[2] < message.measure)
existing[2] = message.measure;
}
}
/* ===
To process a message, convert it to JSON, validate it has the required fields
and then add it to a queue to be sent to Graphdat
=== */
function process(msg, rinfo)
{
// convert the message to JSON, if its bad, throw it away
var packet;
try
{
packet = JSON.parse(msg.toString());
}
catch(ex)
{
if (DEBUG_LOG_INVALID_MESSAGES)
logger.error('%s: ERR Message is not JSON:\n%s', formatDate(new Date()), msg);
invalidMessages++;
return;
}
// validate the message has the required fields and set the timestamps if they are missing
validate(packet, function(err, validMessage)
{
if (err)
{
if (DEBUG_LOG_INVALID_MESSAGES)
logger.error('%s: ERR Invalid Message:\n%j', formatDate(new Date()), packet);
invalidMessages++;
return;
}
if (DEBUG_LOG_VALID_MESSAGES)
logger.log('%s: Added Message:\n%j', formatDate(new Date()), packet);
addMessageToQueue(validMessage);
validMessages++;
});
}
/* ===
Graphdat can accept metrics as a single metric or as a bulk format, by default
we send metrics to the bulk API in the following format
curl https://api.graphdat.com/v1/measurements \
-u <email>:<api-token> \
-X POST \
-H "Content-Type: application/json" \
-d '
[
[
"myserver",
"PAGE_FAULTS",
2,
1377043134
],
[
"myserver",
"CACHE_MEMORY_USED",
0.7,
1377043134
]
]'
=== */
function sendToGraphdat(messageList, callback)
{
var bodyString = JSON.stringify(messageList);
var options = {
method: 'POST',
host: 'api.graphdat.com',
port: 443,
path: '/v1/measurements',
auth: CONFIG_EMAIL + ':' + CONFIG_APITOKEN,
headers: {
'Content-Length': bodyString.length,
'Content-Type': 'application/json'
}
};
try
{
var req = _https.request(options, function(resp) {
resp.on('end', function ()
{
callback(null);
});
resp.on('error', function(err)
{
callback(err);
});
});
req.on('error', function(err)
{
callback(err);
});
req.write(bodyString);
req.end();
}
catch(ex)
{
callback(ex);
}
}
/* ===
Flush the message queue and send the messages to Graphdat on an interval and
reset the metrics
=== */
function flushMessages()
{
if (messageQueue.length > 0)
{
var list = messageQueue;
messageQueue = [];
hash = {};
sendToGraphdat(list, function(err)
{
if (err)
{
logger.error('%s: ERR sending message: %s', formatDate(new Date()), err);
}
else
{
sentMessages += list.length;
if (DEBUG_LOG_SENT_MESSAGE_QUEUE)
logger.log('%s: Sent Messages:\n%j', formatDate(new Date()), JSON.stringify(list));
}
if (DEBUG_LOG_STATUS)
{
logger.log('%s: Queue Flushed: %d invalid messages, %d valid messages, %d messages sent to Graphdat', formatDate(new Date()), invalidMessages, validMessages, sentMessages);
}
list = null;
invalidMessages = 0;
sentMessages = 0;
validMessages = 0;
setTimeout(flushMessages, FLUSH_TIME);
});
}
else
{
if (DEBUG_LOG_STATUS)
{
logger.log('%s: Queue Flushed: %d invalid messages, %d valid messages, %d sent messages', formatDate(new Date()), invalidMessages, validMessages, sentMessages);
}
invalidMessages = 0;
sentMessages = 0;
validMessages = 0;
setTimeout(flushMessages, FLUSH_TIME);
}
}
// start the UDP server
var udpServer = _dgram.createSocket('udp4', process);
udpServer.bind(SERVER_PORT);
// start the message queue
flushMessages();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment