Skip to content

Instantly share code, notes, and snippets.

@kesor
Created February 14, 2017 12:29
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kesor/c20f84cf605db24ca0b51612d1ab1367 to your computer and use it in GitHub Desktop.
Lambda function to send events from AWS CloudWatch Logs to ElasticSearch.
var http = require('http');
var zlib = require('zlib');
var ts = [ 0 ];
var DEBUG = false; // set to `true` to enable timestamps in results
// private IP of the ELK server in "Default VPC"
var endpointHost = '172.31.10.250';
var endpointPort = 9200;
var request_timeout = 800; // milliseconds
function debug(state) {
if (DEBUG) { ts.push(state, Date.now() - ts[0]); }
}
exports.handler = function(input, context) {
ts = [ Date.now() ];
// decode input from base64
var zippedInput = new Buffer(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
debug("parsed gzip");
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
debug("transformed json");
// skip control messages
if (!elasticsearchBulkData) {
context.succeed('Control message handled successfully '+JSON.stringify(ts));
return;
}
// post documents to the Amazon Elasticsearch Service
var requestParams = {
host: endpointHost,
port: endpointPort,
method: 'POST',
path: '/_bulk',
body: elasticsearchBulkData,
headers: {
'Content-Type': 'application/json',
'Host': endpointHost + ':' + endpointPort,
'Content-Length': Buffer.byteLength(elasticsearchBulkData)
}
};
debug("sending request");
var request = http.request(requestParams, function(response) {
debug("started response");
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
debug("finished response");
var info = JSON.parse(responseBody);
var failedItems;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
}
var error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
if (error) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " + JSON.stringify(failedItems, null, 2));
}
context.fail(JSON.stringify([ error, ts ]));
} else {
context.succeed('Success '+JSON.stringify(ts));
}
});
}).on('error', function(e) {
context.fail(e);
});
request.on('socket', function(socket) {
socket.setTimeout(request_timeout, function() {
request.abort();
});
debug("socket established");
});
request.end(requestParams.body);
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD.HH
var indexName = [
'cwl-' + timestamp.getUTCFullYear(), // year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2), // day
('0' + timestamp.getUTCHours()).slice(-2) // hour
].join('.');
var source = {};
source['@timestamp'] = timestamp.toISOString();
source.message = logEvent.message;
source.gateway = payload.logGroup.replace(/^[^_]+_/, '');
var action = { "index": {} };
action.index._index = indexName;
action.index._type = 'cwl';
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source)
].join('\n') + '\n';
});
return bulkRequestBody;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment