Skip to content

Instantly share code, notes, and snippets.

@heri16
Last active October 30, 2020 12:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heri16/ae9b3b0e036d891b9ba76f10cc123703 to your computer and use it in GitHub Desktop.
Save heri16/ae9b3b0e036d891b9ba76f10cc123703 to your computer and use it in GitHub Desktop.
AWS Lambda to filter CloudTrail management logs in S3 (before Athena)
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { filter } = require('stream-json/filters/Filter');
const { streamValues } = require('stream-json/streamers/StreamValues');
const { disassembler } = require('stream-json/Disassembler');
const { stringer } = require('stream-json/Stringer');
const ST = require('stream-template');
const aws = require('aws-sdk');
const s3 = new aws.S3();
const fs = require('fs');
const zlib = require('zlib');
const defaultFilterOpts = {
arrayPath: '^Records',
eventSource: 'sts.amazonaws.com',
eventName: 'AssumeRoleWithWebIdentity'
}
const processJSON = ({ fileIn, filterOpts = defaultFilterOpts }, callback) => {
const { arrayPath, eventSource, eventName } = filterOpts;
// assembling a processing pipe:
const recordsOut = chain([
fileIn,
// gunzip the potentially huge file using small memory
zlib.createGunzip(),
// we have to parse JSON
parser(),
// we'll pick values from the array
pick({ filter: new RegExp(`${arrayPath}\\.\\d+`) }),
// assemble them to JS values and stream them
streamValues({
objectFilter: ({ key, stack, current }) => {
if (key === null && stack.length === 0) {
// let's filter them according to our criteria
if (current.hasOwnProperty('eventSource') && current.eventSource !== eventSource) return false;
if (current.hasOwnProperty('eventName')) return current.eventName === eventName;
}
}
}),
// disassemble a stream of objects back to tokens
disassembler(),
// normalize disassembler output
pick({ filter: 'value' }),
// back to JSON as an array of objects
stringer({ makeArray: true })
], {writableObjectMode: false, readableObjectMode: false});
// it is always good keep an eye on errors
recordsOut.on('error', callback || console.error);
// add back JSON wrapper
const jsonOut = ST`{"Records":${recordsOut}}`;
// gzip the potentially huge file using small memory
const gzip = zlib.Gzip()
// If the Readable stream emits an error during processing, the Writable destination is not closed automatically.
// If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.
jsonOut.on('error', err => gzip.destroy(err));
// return the gzip stream (with backpressure support)
return jsonOut.pipe(gzip);
// Pipe the result to stdout so we can see it
// jsonOut.pipe(process.stdout);
}
// Cannot use async as nested callback required
exports.handler = function(event, context, callback) {
const evt = event.Records[0];
const Bucket = evt.s3.bucket.name;
const Key = evt.s3.object.key;
const VersionId = evt.s3.object.versionId;
const params = { Bucket, Key, VersionId };
s3.headObject(params).promise().then((prevMeta) => {
delete prevMeta.LastModified;
delete prevMeta.ContentLength;
delete prevMeta.ETag;
const fileIn = s3.getObject(params).createReadStream();
const stream = processJSON({ fileIn }, callback);
const uploadParams = { ...params, ...prevMeta, Key: `assumeRole\${Key}`, Body: stream };
return s3.putObject(uploadParams).promise();
}).then(data => callback(null, data), callback);
};
// For local testing only
const filename = '080428581103_CloudTrail_us-east-1_20201028T0825Z_kN5ZaldemkDIwXsL.json.gz';
const fileIn = fs.createReadStream(filename);
const fileOut = fs.createWriteStream('out-' + filename);
processJSON({ fileIn }).pipe(fileOut).on('error', console.error);
{
"name": "cloudtrail-filter",
"version": "1.0.0",
"description": "",
"main": "filter.js",
"scripts": {
"start": "node .",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"aws-sdk": "^2.782.0",
"stream-json": "^1.7.1",
"stream-template": "^0.0.10"
}
}
@heri16
Copy link
Author

heri16 commented Oct 30, 2020

This lambda uses the excellent stream-json library.

JSON.parse() is not used as it will crash if the size of the log file exceeds the available memory on AWS lambda.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment