Skip to content

Instantly share code, notes, and snippets.

@letrunghieu
Created December 19, 2018 09:08
Show Gist options
  • Save letrunghieu/f2cf5d2324a68a8ec8c209619467185b to your computer and use it in GitHub Desktop.
Save letrunghieu/f2cf5d2324a68a8ec8c209619467185b to your computer and use it in GitHub Desktop.
Lambda transformation
'use strict';
console.log('Loading function');
let patterns = require('node-grok').loadDefaultSync();
let geoip = require('geoip-lite');
let device = require('device');
let countries = require('countryjs');
let moment = require('moment');
let grokPattern = "%{HTTPDATE:timestamp} %{IP:clientIP} %{NOTSPACE:username} %{WORD:method} %{NOTSPACE:uri} %{IP:serverIP} %{NUMBER:port} %{NOTSPACE:host} %{QUOTEDSTRING:userAgent} %{NOTSPACE:referer} %{NUMBER:status} %{NUMBER:bytesSent} %{NUMBER:requestTime} %{QUOTEDSTRING:forwardedFor} (?<forwardedPort>-|%{NUMBER}) (?<forwardedProtocol>-|%{NOTSPACE})";
let pattern = patterns.createPattern(grokPattern);
/**
* @see http://docs.aws.amazon.com/elasticloadbalancing/latest/classic/x-forwarded-headers.html#x-forwarded-for
* @param strVal
* @return string|null
*/
const getForwardedIp = (strVal) => {
let ips = strVal.split(",").map((x) => x.trim());
let originalIP = ips[ips.length - 1];
if (originalIP === "-") {
return null;
} else {
return originalIP;
}
};
let transform = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
/* Process the list of records and transform them */
const output = event.records.map((record) => {
const entry = (Buffer.from(record.data, 'base64')).toString('utf8');
const match = pattern.parseSync(entry);
let result = {
message: entry,
tags: []
};
if (match) {
/* Prepare JSON version from Apache log data */
result = Object.assign(result, match);
result.tags.push('grok_parsed_success');
// trim the double quotes from the user agent, referer and forwarded for
result.userAgent = result.userAgent.substr(1, result.userAgent.length - 2);
result.referer = result.referer.substr(1, result.referer.length - 2);
result.forwardedFor = result.forwardedFor.substr(1, result.forwardedFor.length - 2);
// convert strings to numbers
result.bytesSent = +result.bytesSent;
result.requestTime = +result.requestTime;
// get the original IP
let forwardedIp = getForwardedIp(result.forwardedFor);
if (!forwardedIp) {
result.originalIP = result.clientIP;
} else {
result.originalIP = forwardedIp;
}
// geoip lookup
let resolvedIp = geoip.lookup(result.originalIP);
result.geoip = resolvedIp;
if (resolvedIp) {
result.tags.push('geoip_looked_up_success');
result.geoip.countryName = countries.name(result.geoip.country);
// the GeoJSON format has the longitude before the latitude in the array, we must change
// geoip.ll into geoip.location object
result.geoip.location = {
lat: result.geoip.ll[0],
lon: result.geoip.ll[1]
};
delete result.geoip.ll;
} else {
result.tags.push('geoip_looked_up_fail');
}
// useragent parser
result.device = device(result.userAgent, {parseUserAgent: true});
result.browser = result.device.parser.useragent;
delete result.device.parser;
// add @timestamp field
result["@timestamp"] = moment(result.timestamp, 'DD/MMM/YYYY:HH:mm:ss Z', true).toISOString();
delete result.timestamp;
const payload = (Buffer.from(JSON.stringify(result), 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
} else {
result.tags.push('grok_parsed_fail');
/* Failed event, notify the error and leave the record intact */
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`);
callback(null, {records: output});
};
exports.handler = transform;
{
"name": "nginx-access-log-transformer",
"version": "1.0.0",
"description": "Kinesis Firehose transformer for Nginx access logs",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "ssh://git@stash.resales-online.com:7999/res/nginx-access-log-transformer.git"
},
"author": "Hieu Le <letrunghieu.cse09@gmail.com>",
"license": "UNLICENSED",
"private": true,
"dependencies": {
"countryjs": "^1.8.0",
"device": "^0.3.8",
"geoip-lite": "^1.2.1",
"moment": "^2.18.1",
"node-grok": "^2.0.3",
"useragent": "^2.1.13"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment