Skip to content

Instantly share code, notes, and snippets.

@vgw-rhysc
Created October 12, 2018 07:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vgw-rhysc/e88194c22595530568ef8819fe5eb2fe to your computer and use it in GitHub Desktop.
Save vgw-rhysc/e88194c22595530568ef8819fe5eb2fe to your computer and use it in GitHub Desktop.
AWS Lambda to translate Snowplow CloudFront logs to usable JSON - Using S3 create events as the trigger
console.log('Loading function');
const aws = require('aws-sdk');
const zlib = require('zlib');
const CloudFrontParser = require('cloudfront-log-parser');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
let getS3FileContents = async(bucketName, objectKey) => {
const params = {
Bucket: bucketName,
Key: objectKey,
};
try {
const { Body } = await s3.getObject(params).promise();
var buffer = new Buffer(Body);
var decodedBody = zlib.gunzipSync(buffer).toString('utf8')
return decodedBody;
} catch (err) {
console.log(err);
const message = `Error getting object ${objectKey} from bucket ${bucketName} or decoding object. Make sure they exist and your bucket is in the same region as this function.`;
console.log(message);
throw new Error(message);
}
};
let putJsonObjAsGzipToS3 = async(bucketName, objectKey, jsonString) => {
var bufferObject = new Buffer.from(jsonString);
var encodedBody = zlib.gzipSync(bufferObject)
const params = {
Bucket: bucketName,
Key: objectKey,
Body: encodedBody,
ContentType: "application/json",
ContentEncoding: 'gzip'
};
try {
await s3.putObject(params).promise();
} catch (err) {
console.log(err);
const message = `Error saving object ${objectKey} to bucket ${bucketName}. Make sure they exist and your bucket is in the same region as this function.`;
console.log(message);
throw new Error(message);
}
};
let mapCloudFrontObjectToSnowPlowObject = (access) =>{
// SEE - https://github.com/snowplow/snowplow/wiki/snowplow-tracker-protocol#21-web-specific-parameters
var queryString = access['cs-uri-query'];
if(queryString == '-') return access;
var queryStringParams = decodeURI(decodeURI(queryString)).split("&");
let dict = {}
queryStringParams.forEach(element => {
let kvp = element.split("=");
dict[kvp[0]] = kvp[1]
});
if(dict['cx']){
dict['context'] = Buffer.from(dict['cx'], 'base64').toString('utf8');
}
if(dict['ue_px']){
dict['unstruct_event'] = Buffer.from(dict['ue_px'], 'base64').toString('utf8');
}
access['snowplowData'] = dict;
return access;
};
let parseCloudfrontSnowplowLogs = (fileContents) => {
const accesses = CloudFrontParser.parse(fileContents, { format: 'web' });
let snowplowEvents = accesses.map(mapCloudFrontObjectToSnowPlowObject);
return snowplowEvents;
};
exports.handler = async (event, context) => {
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
var fileContents = await getS3FileContents(bucket, key);
var snowplowEvents = await parseCloudfrontSnowplowLogs(fileContents);
let stagedContent = JSON.stringify(snowplowEvents, null, 0)
console.log('snowplowEvents');
console.log(stagedContent);
await putJsonObjAsGzipToS3(bucket, 'staged/' + key, stagedContent);
return 'Ok'
};
{
"name": "clickstream-log-reader",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"prepackage": "rm -f lambda.zip && mkdir -p lambda/node_modules/ && rm -rf ./node_modules && npm install --prod",
"package": "cp ./index.js ./lambda/ && cp -r ./node_modules/. ./lambda/node_modules/ && cd lambda && zip -r ../lambda.zip *",
"postpackage": "rm -rf lambda && rm -rf ./node_modules && npm install",
"clean": "rm -fr ./output"
},
"author": "",
"license": "ISC",
"dependencies": {
"cloudfront-log-parser": "^1.0.0",
"zlib": "^1.0.5"
}
}
@kennethrorydunn
Copy link

Please change the error handling code to ensure a useful error message is returned.

From:
const message = Error getting object ${objectKey} from bucket ${bucketName} or decoding object. Make sure they exist and your bucket is in the same region as this function.;
console.log(message);

To:
console.log(err, err.stack);
throw new Error(err);

Amazingly this exact error message has spread across 50 GitHub Gists. #copypasta

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment