Skip to content

Instantly share code, notes, and snippets.

@aaronfranco
Last active September 21, 2021 16:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save aaronfranco/68d457c395ce9eda9d939a3e1d52d5c4 to your computer and use it in GitHub Desktop.
Save aaronfranco/68d457c395ce9eda9d939a3e1d52d5c4 to your computer and use it in GitHub Desktop.
Analyzing Multi-Account WAF Logs with AWS Elasticsearch Service, Amazon Athena and QuickSight Scripts
// JavaScript
// Load the AWS SDK
const aws = require('aws-sdk');
// Construct the AWS S3 Object
const s3 = new aws.S3();
// Define variable for the destination bucket
var destBucket = "<central logging bucket name>";
//Main function
exports.handler = (event, context, callback) => {
// get the source bucket name
var _srcBucket = event.Records[0].s3.bucket.name;
// get the object key of the file that landed on S3
let _key = event.Records[0].s3.object.key;
// split the key by "/"
let _keySplit = _key.split("/")
// get the object name
let _objName = _keySplit[ (_keySplit.length - 1) ];
// reset the destination path
let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/<kinesis_firehose_name>/"+_objName;
// setup the source path
let _sourcePath = _srcBucket + "/" + _key;
// build the params for the copyObject request to S3
let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath };
// execute the copyObject request
s3.copyObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log("SUCCESS!");
}
});
callback(null, 'All done!');
};
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
import sys, datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, [])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init("myjob", args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0")
resolvechoice2 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
def MapPartitions(rec):
d = datetime.datetime.fromtimestamp((rec.timestamp * 0.001)).date()
rec.year = d.year
rec.month = d.month
rec.day = d.day
return rec
mapped_dyF = Map.apply(frame = dropnullfields3, f = MapPartitions, transformation_ctx = "mapped_dyF", info = "mapped_dyF")
applymapping1 = ApplyMapping.apply(frame = mapped_dyF, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"),("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = mapped_dyF, connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed", "partitionKeys": ["year", "month"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
// JavaScript Code
/*
MIT No Attribution
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/* Imports */
var AWS = require('aws-sdk');
var LineStream = require('byline').LineStream;
var parse = require('clf-parser'); // Apache Common Log Format
var path = require('path');
var stream = require('stream');
var http = require("http");
/* Globals */
var esDomain = {
endpoint: process.env.endpoint,
region: process.env.region,
index: process.env.es_index,
doctype: process.env.doctype
};
console.log("ES DOMAIN: ")
console.log(JSON.stringify(esDomain))
var endpoint = new AWS.Endpoint(esDomain.endpoint);
var s3 = new AWS.S3();
var totLogLines = 0; // Total number of log lines in the file
var numDocsAdded = 0; // Number of log lines added to ES so far
var options = {
host: esDomain.endpoint,
path: "/"+esDomain.index,
method: "PUT",
headers: {
"Content-Type": "application/json",
"presigned-expires": false
}
};
/*
* The AWS credentials are picked up from the environment.
* They belong to the IAM role assigned to the Lambda function.
* Since the ES requests are signed using these credentials,
* make sure to apply a policy that permits ES domain operations
* to the role.
*/
var creds = new AWS.EnvironmentCredentials('AWS');
/*
* Get the log file from the given S3 bucket and key. Parse it and add
* each log record to the ES domain.
*/
function s3LogsToES(bucket, key, context, lineStream, recordStream) {
// Note: The Lambda function should be configured to filter for .log files
// (as part of the Event Source "suffix" setting).
var s3Stream = s3.getObject({Bucket: bucket, Key: key}).createReadStream();
// Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
s3Stream
.pipe(lineStream)
.pipe(recordStream)
.on('data', function(parsedEntry) {
console.log("Sending to ES:")
console.log(parsedEntry)
console.log(typeof parsedEntry)
let data = JSON.parse(parsedEntry)
console.log(data.remote_addr)
postDocumentToES(data.remote_addr, context);
});
s3Stream.on('error', function() {
console.log(
'Error getting object "' + key + '" from bucket "' + bucket + '". ' +
'Make sure they exist and your bucket is in the same region as this function.');
context.fail();
});
}
/*
* Add the given document to the ES domain.
* If all records are successfully added, indicate success to lambda
* (using the "context" parameter).
*/
function postDocumentToES(doc, context) {
var req = new AWS.HttpRequest(endpoint);
req.method = 'POST';
req.path = "/"+esDomain.index+"/"+esDomain.doctype;
req.region = esDomain.region;
req.body = doc;
req.headers['presigned-expires'] = false;
req.headers['Content-Type'] = "application/json"
req.headers['Host'] = endpoint.host;
// Sign the request (Sigv4)
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function(httpResp) {
var body = '';
httpResp.on('data', function (chunk) {
body += chunk;
});
httpResp.on('end', function (chunk) {
numDocsAdded ++;
console.log(body);
if (numDocsAdded === totLogLines) {
// Mark lambda success. If not done so, it will be retried.
console.log('All ' + numDocsAdded + ' log records added to ES.');
context.succeed();
}
});
}, function(err) {
console.log('Error: ' + err);
console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.');
context.fail();
});
}
/* Lambda "main": Execution starts here */
exports.handler = function(event, context) {
console.log('Received event: ', JSON.stringify(event, null, 2));
/* == Streams ==
* To avoid loading an entire (typically large) log file into memory,
* this is implemented as a pipeline of filters, streaming log data
* from S3 to ES.
* Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
*/
var lineStream = new LineStream();
// A stream of log records, from parsing each log line
var recordStream = new stream.Transform({objectMode: true})
recordStream._transform = function(line, encoding, done) {
var logRecord = parse(line.toString());
var serializedRecord = JSON.stringify(logRecord);
this.push(serializedRecord);
totLogLines ++;
done();
}
console.log(JSON.stringify(event))
event.Records.forEach(function(record) {
var bucket = record.s3.bucket.name;
var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
s3LogsToES(bucket, objKey, context, lineStream, recordStream);
});
}
@jacekfeldman
Copy link

HI I have:
"
{
"error": {
"root_cause": [
{
"type": "mapper_parsing_exception",
"reason": "failed to parse"
}
],
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input in VALUE_STRING\n at [Source: org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper@69223f7c; line: 1, column: 1153]"
}
},
"status": 400
}"
when try to use your s3-trigger-handler.js

@aaronfranco
Copy link
Author

Check the Lambda logs in Cloudwatch for more accurate debugging. I'm not sure why this error is happening from ES.

@awsstudygroup2021
Copy link

hi I read your blog on this link https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/
Could you please help to give more guidance relate to Glue and Athena? Im stuck at that step.
Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment