Skip to content

Instantly share code, notes, and snippets.

@heri16
Last active October 30, 2020 12:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heri16/ae9b3b0e036d891b9ba76f10cc123703 to your computer and use it in GitHub Desktop.
Save heri16/ae9b3b0e036d891b9ba76f10cc123703 to your computer and use it in GitHub Desktop.
AWS Lambda to filter CloudTrail management logs in S3 (before Athena)
const { chain } = require('stream-chain');
const { parser } = require('stream-json');
const { pick } = require('stream-json/filters/Pick');
const { filter } = require('stream-json/filters/Filter');
const { streamValues } = require('stream-json/streamers/StreamValues');
const { disassembler } = require('stream-json/Disassembler');
const { stringer } = require('stream-json/Stringer');
const ST = require('stream-template');
const aws = require('aws-sdk');
const s3 = new aws.S3();
const fs = require('fs');
const zlib = require('zlib');
const defaultFilterOpts = {
arrayPath: '^Records',
eventSource: 'sts.amazonaws.com',
eventName: 'AssumeRoleWithWebIdentity'
}
const processJSON = ({ fileIn, filterOpts = defaultFilterOpts }, callback) => {
const { arrayPath, eventSource, eventName } = filterOpts;
// assembling a processing pipe:
const recordsOut = chain([
fileIn,
// gunzip the potentially huge file using small memory
zlib.createGunzip(),
// we have to parse JSON
parser(),
// we'll pick values from the array
pick({ filter: new RegExp(`${arrayPath}\\.\\d+`) }),
// assemble them to JS values and stream them
streamValues({
objectFilter: ({ key, stack, current }) => {
if (key === null && stack.length === 0) {
// let's filter them according to our criteria
if (current.hasOwnProperty('eventSource') && current.eventSource !== eventSource) return false;
if (current.hasOwnProperty('eventName')) return current.eventName === eventName;
}
}
}),
// disassemble a stream of objects back to tokens
disassembler(),
// normalize disassembler output
pick({ filter: 'value' }),
// back to JSON as an array of objects
stringer({ makeArray: true })
], {writableObjectMode: false, readableObjectMode: false});
// it is always good keep an eye on errors
recordsOut.on('error', callback || console.error);
// add back JSON wrapper
const jsonOut = ST`{"Records":${recordsOut}}`;
// gzip the potentially huge file using small memory
const gzip = zlib.Gzip()
// If the Readable stream emits an error during processing, the Writable destination is not closed automatically.
// If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.
jsonOut.on('error', err => gzip.destroy(err));
// return the gzip stream (with backpressure support)
return jsonOut.pipe(gzip);
// Pipe the result to stdout so we can see it
// jsonOut.pipe(process.stdout);
}
// Cannot use async as nested callback required
exports.handler = function(event, context, callback) {
const evt = event.Records[0];
const Bucket = evt.s3.bucket.name;
const Key = evt.s3.object.key;
const VersionId = evt.s3.object.versionId;
const params = { Bucket, Key, VersionId };
s3.headObject(params).promise().then((prevMeta) => {
delete prevMeta.LastModified;
delete prevMeta.ContentLength;
delete prevMeta.ETag;
const fileIn = s3.getObject(params).createReadStream();
const stream = processJSON({ fileIn }, callback);
const uploadParams = { ...params, ...prevMeta, Key: `assumeRole\${Key}`, Body: stream };
return s3.putObject(uploadParams).promise();
}).then(data => callback(null, data), callback);
};
// For local testing only
const filename = '080428581103_CloudTrail_us-east-1_20201028T0825Z_kN5ZaldemkDIwXsL.json.gz';
const fileIn = fs.createReadStream(filename);
const fileOut = fs.createWriteStream('out-' + filename);
processJSON({ fileIn }).pipe(fileOut).on('error', console.error);
{
"name": "cloudtrail-filter",
"version": "1.0.0",
"description": "",
"main": "filter.js",
"scripts": {
"start": "node .",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"aws-sdk": "^2.782.0",
"stream-json": "^1.7.1",
"stream-template": "^0.0.10"
}
}
@heri16
Copy link
Author

heri16 commented Oct 30, 2020

All management events are delivered every 5 minutes to S3 by CloudTrial (which is free of charge if "set up a trail that delivers a single copy of management events in each region").

Athena scans these log files that may contain management events that are not relevant to us, and grows in size over time.

This Lambda s3 trigger helps to filter out only the management events that needs to be kept long-term, to reduce processing costs by Athena (which charges for the number of bytes scanned).

@heri16
Copy link
Author

heri16 commented Oct 30, 2020

This lambda uses the excellent stream-json library.

JSON.parse() is not used as it will crash if the size of the log file exceeds the available memory on AWS lambda.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment