Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AWS Lambda for shipping logs from S3 to CloudWatch
const aws = require('aws-sdk');
const s3 = new aws.S3({apiVersion: '2006-03-01'});
const cw = new aws.CloudWatchLogs({apiVersion: '2015-01-28'});
const AlreadyExists = 'ResourceAlreadyExistsException';
const months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'];
// Thanks https://github.com/caolan/async/blob/master/lib/waterfall.js
function cascade(tasks, callback, ignoreError)
{
callback = callback || (() => undefined);
ignoreError = ignoreError || (() => false);
if (!tasks || !tasks.length) { return callback(); }
let taskIndex = 0;
const nextTask = (args) => tasks[taskIndex++].apply(null, [next, ...args]);
const next = (err, ...args) => {
const shouldIgnoreError = (err && ignoreError(err, taskIndex));
return ((err && !shouldIgnoreError) || taskIndex === tasks.length) ? (
callback.apply(null, [(shouldIgnoreError ? null : err), ...args])
) : (
nextTask(args)
);
};
nextTask([]);
}
// Thanks http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html (see: Time)
// Thanks http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat
function parseTime(dateString)
{
let dg = null;
// Amazon "Server Access Log" Format Time pattern:
// strftime [%d/%b/%Y:%H:%M:%S %z] (ex. [06/Feb/2014:00:00:38 +0000])
dg = dateString.match(/(\d{2})\/([A-Z][a-z]{2})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})/);
if(dg) {
return Date.UTC(dg[3], months.indexOf(dg[2]), dg[1], dg[4], dg[5], dg[6]);
}
// ISO 8601 (ex. 2014-05-23T01:13:11Z) or
// Amazon "CloudFront Access Log" Format Time pattern:
// strftime %Y-%m-%d%t%H:%M:%S (ex. 2014-05-23 01:13:11)
dg = dateString.match(/(\d{4})-(\d{2})-(\d{2})[T\s](\d{2}):(\d{2}):(\d{2})/);
if(dg) {
return Date.UTC(dg[1], parseInt(dg[2], 10) - 1, dg[3], dg[4], dg[5], dg[6]);
}
return Date.now();
}
// Thanks https://github.com/rajatkumar/cloudwatchlogger/blob/master/lib/CloudWatchLogsStream.js
exports.handler = (event, context, callback) => {
let sequenceToken = null;
const eventRecord = event.Records[0]; // only ever one record
const {bucket: {name: Bucket}, object} = eventRecord.s3;
const Key = decodeURIComponent(object.key.replace(/\+/g, ' '));
const pathComponents = Key.split('/');
const logGroupName = pathComponents.splice(0, 1)[0]; // first path component
const logId = pathComponents.join('/'); // all but the first path component (joined with '/')
// If logId contains a date in the YYYY-mm-dd format, use that as the stream name
const deliveryDate = logId.match(/\d{4}-\d{2}-\d{2}/);
const logStreamName = deliveryDate ? deliveryDate[0] : logId;
function getObject(cb)
{
s3.getObject({Bucket, Key}, cb);
}
function deleteObject(cb)
{
s3.deleteObject({Bucket, Key}, cb);
}
function parseLogEvents(cb, getObjectResponse)
{
cb(null, getObjectResponse.Body.toString().trim().split('\n').reduce((a, line) => {
// Ignore lines that begin with # (to accommodate CloudFront format)
if(!line.startsWith('#')) {
// Append log file source to the message body
a.push({message: line.trim() + (` s3://${Bucket}/${Key}`), timestamp: parseTime(line)});
console.log('Parsed: ', a[a.length - 1]);
}
else {
console.log('Ignored: ', line);
}
return a;
}, []));
}
function putLogEvents(cb, logEvents)
{
if(!logEvents.length) { return cb(new Error('Nothing to log')); }
cw.putLogEvents({logStreamName, logGroupName, logEvents, sequenceToken}, cb);
}
function createStream(cb)
{
cw.createLogStream({logGroupName, logStreamName}, cb);
}
function createGroup(cb)
{
cw.createLogGroup({logGroupName}, cb);
}
function describeStream(cb)
{
cw.describeLogStreams({logGroupName, logStreamNamePrefix: logStreamName, limit: 1}, cb);
}
function setSequenceToken(cb, data)
{
// If sequenceToken is missing, the stream may have just been created
const valid = (data && data.logStreams && data.logStreams.length);
sequenceToken = !valid ? null : data.logStreams[0].uploadSequenceToken;
cb(null);
}
function createLogger(cb)
{
// Ignore AlreadyExists exceptions (they just mean there is nothing to do before continuing)
cascade([createGroup, createStream], cb, (err) => (err.code === AlreadyExists));
}
cascade([
createLogger,
describeStream,
setSequenceToken,
getObject,
parseLogEvents,
putLogEvents,
deleteObject
], callback);
};
@adamjarret

This comment has been minimized.

Copy link
Owner Author

commented Oct 16, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.