Skip to content

Instantly share code, notes, and snippets.

@hendrixroa
Created January 18, 2020 22:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hendrixroa/d7a585673c74f495d480cbec5cb1e312 to your computer and use it in GitHub Desktop.
Save hendrixroa/d7a585673c74f495d480cbec5cb1e312 to your computer and use it in GitHub Desktop.
AWS Lambda function to stream cloudwatch logs to elasticsearch with some modification to accept multiples indices
const https = require('https');
const zlib = require('zlib');
const crypto = require('crypto');
const FunctionShield = require('@puresec/function-shield');
const ENV = process.env;
const endpoint = ENV.es_endpoint;
FunctionShield.configure(
{
policy: {
read_write_tmp: 'alert',
create_child_process: 'alert',
outbound_connectivity: 'alert',
read_handler: 'alert'
},
disable_analytics: false,
token: ENV.function_shield_token
});
exports.handler = (input, context) => {
// decode input from base64
const zippedInput = new Buffer(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, (error, buffer) => {
if (error) { context.fail(error); return; }
// parse the input from JSON
const awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
const elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
context.succeed('Control message handled successfully');
return;
}
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, (error, success, statusCode, failedItems) => {
if (error) {
console.error(`Error: ${JSON.stringify(error, null, 2)}`);
if (failedItems && failedItems.length > 0) {
console.log(`Failed Items: ${JSON.stringify(failedItems, null, 2)}`);
}
context.fail(JSON.stringify(error));
} else {
context.succeed('Success');
}
});
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
let bulkRequestBody = '';
payload.logEvents.forEach((logEvent) => {
const indexName = `sm-${payload.logGroup.toLowerCase()}`;
let source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
let action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}
function buildSource(message, extractedFields) {
let jsonSubString = {};
if (extractedFields) {
let source = {};
for (let key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
const value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}
jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}
source[key] = value;
}
}
return source;
}
jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}
return {};
}
function extractJson(message) {
const jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
const jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}
function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
function post(body, callback) {
const requestParams = buildRequest(endpoint, body);
const request = https.request(requestParams, (response) => {
let responseBody = '';
response.on('data', (chunk) => {
responseBody += chunk;
});
response.on('end', () => {
const info = JSON.parse(responseBody);
let failedItems = [];
let success = {};
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter((x) => {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
const error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
callback(error, success, response.statusCode, failedItems);
});
}).on('error', (e) => {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
const endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
const region = endpointParts[2];
const service = endpointParts[3];
const datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
const date = datetime.substr(0, 8);
const kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
const kRegion = hmac(kDate, region);
const kService = hmac(kRegion, service);
const kSigning = hmac(kService, 'aws4_request');
let request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
const canonicalHeaders = Object.keys(request.headers)
.sort((a, b) => {
return a.toLowerCase() < b.toLowerCase() ? -1 : 1;
})
.map((k) => {
return k.toLowerCase() + ':' + request.headers[k];
})
.join('\n');
const signedHeaders = Object.keys(request.headers)
.map((k) => {
return k.toLowerCase();
})
.sort()
.join(';');
const canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');
const credentialString = [ date, region, service, 'aws4_request' ].join('/');
const stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment