Last active
September 21, 2021 16:50
-
-
Save aaronfranco/68d457c395ce9eda9d939a3e1d52d5c4 to your computer and use it in GitHub Desktop.
Analyzing Multi-Account WAF Logs with AWS Elasticsearch Service, Amazon Athena and QuickSight Scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// JavaScript | |
// Load the AWS SDK | |
const aws = require('aws-sdk'); | |
// Construct the AWS S3 Object | |
const s3 = new aws.S3(); | |
// Define variable for the destination bucket | |
var destBucket = "<central logging bucket name>"; | |
//Main function | |
exports.handler = (event, context, callback) => { | |
// get the source bucket name | |
var _srcBucket = event.Records[0].s3.bucket.name; | |
// get the object key of the file that landed on S3 | |
let _key = event.Records[0].s3.object.key; | |
// split the key by "/" | |
let _keySplit = _key.split("/") | |
// get the object name | |
let _objName = _keySplit[ (_keySplit.length - 1) ]; | |
// reset the destination path | |
let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/<kinesis_firehose_name>/"+_objName; | |
// setup the source path | |
let _sourcePath = _srcBucket + "/" + _key; | |
// build the params for the copyObject request to S3 | |
let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath }; | |
// execute the copyObject request | |
s3.copyObject(params, function(err, data) { | |
if (err) { | |
console.log(err, err.stack); | |
} else { | |
console.log("SUCCESS!"); | |
} | |
}); | |
callback(null, 'All done!'); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.dynamicframe import DynamicFrame | |
from awsglue.job import Job | |
## @params: [JOB_NAME] | |
args = getResolvedOptions(sys.argv, ['JOB_NAME']) | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
## @type: DataSource | |
## @args: [database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0"] | |
## @return: datasource0 | |
## @inputs: [] | |
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0") | |
## @type: ApplyMapping | |
## @args: [mapping = [("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1"] | |
## @return: applymapping1 | |
## @inputs: [frame = datasource0] | |
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1") | |
## @type: ResolveChoice | |
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] | |
## @return: resolvechoice2 | |
## @inputs: [frame = applymapping1] | |
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") | |
## @type: DropNullFields | |
## @args: [transformation_ctx = "dropnullfields3"] | |
## @return: dropnullfields3 | |
## @inputs: [frame = resolvechoice2] | |
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") | |
## @type: DataSink | |
## @args: [connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed"}, format = "parquet", transformation_ctx = "datasink4"] | |
## @return: datasink4 | |
## @inputs: [frame = dropnullfields3] | |
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed"}, format = "parquet", transformation_ctx = "datasink4") | |
job.commit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, datetime | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.dynamicframe import DynamicFrame | |
from awsglue.job import Job | |
## @params: [JOB_NAME] | |
args = getResolvedOptions(sys.argv, []) | |
sc = SparkContext.getOrCreate() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init("myjob", args) | |
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawleddata", table_name = "waf_central_logs_blogpost_agf", transformation_ctx = "datasource0") | |
resolvechoice2 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice2") | |
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") | |
def MapPartitions(rec): | |
d = datetime.datetime.fromtimestamp((rec.timestamp * 0.001)).date() | |
rec.year = d.year | |
rec.month = d.month | |
rec.day = d.day | |
return rec | |
mapped_dyF = Map.apply(frame = dropnullfields3, f = MapPartitions, transformation_ctx = "mapped_dyF", info = "mapped_dyF") | |
applymapping1 = ApplyMapping.apply(frame = mapped_dyF, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"),("timestamp", "long", "timestamp", "long"), ("formatversion", "int", "formatversion", "int"), ("webaclid", "string", "webaclid", "string"), ("partition_0", "string", "account_id", "string"), ("partition_1", "string", "region", "string"), ("partition_2", "string", "waf_name", "string"), ("httprequest.clientIp", "string", "client_ip", "string"), ("httprequest.country", "string", "country", "string")], transformation_ctx = "applymapping1") | |
datasink4 = glueContext.write_dynamic_frame.from_options(frame = mapped_dyF, connection_type = "s3", connection_options = {"path": "s3://waf-logs-transformed", "partitionKeys": ["year", "month"]}, format = "parquet", transformation_ctx = "datasink4") | |
job.commit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// JavaScript Code | |
/* | |
MIT No Attribution | |
Permission is hereby granted, free of charge, to any person obtaining a copy of this | |
software and associated documentation files (the "Software"), to deal in the Software | |
without restriction, including without limitation the rights to use, copy, modify, | |
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to | |
permit persons to whom the Software is furnished to do so. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A | |
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT | |
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE | |
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
*/ | |
/* Imports */ | |
var AWS = require('aws-sdk'); | |
var LineStream = require('byline').LineStream; | |
var parse = require('clf-parser'); // Apache Common Log Format | |
var path = require('path'); | |
var stream = require('stream'); | |
var http = require("http"); | |
/* Globals */ | |
var esDomain = { | |
endpoint: process.env.endpoint, | |
region: process.env.region, | |
index: process.env.es_index, | |
doctype: process.env.doctype | |
}; | |
console.log("ES DOMAIN: ") | |
console.log(JSON.stringify(esDomain)) | |
var endpoint = new AWS.Endpoint(esDomain.endpoint); | |
var s3 = new AWS.S3(); | |
var totLogLines = 0; // Total number of log lines in the file | |
var numDocsAdded = 0; // Number of log lines added to ES so far | |
var options = { | |
host: esDomain.endpoint, | |
path: "/"+esDomain.index, | |
method: "PUT", | |
headers: { | |
"Content-Type": "application/json", | |
"presigned-expires": false | |
} | |
}; | |
/* | |
* The AWS credentials are picked up from the environment. | |
* They belong to the IAM role assigned to the Lambda function. | |
* Since the ES requests are signed using these credentials, | |
* make sure to apply a policy that permits ES domain operations | |
* to the role. | |
*/ | |
var creds = new AWS.EnvironmentCredentials('AWS'); | |
/* | |
* Get the log file from the given S3 bucket and key. Parse it and add | |
* each log record to the ES domain. | |
*/ | |
function s3LogsToES(bucket, key, context, lineStream, recordStream) { | |
// Note: The Lambda function should be configured to filter for .log files | |
// (as part of the Event Source "suffix" setting). | |
var s3Stream = s3.getObject({Bucket: bucket, Key: key}).createReadStream(); | |
// Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES | |
s3Stream | |
.pipe(lineStream) | |
.pipe(recordStream) | |
.on('data', function(parsedEntry) { | |
console.log("Sending to ES:") | |
console.log(parsedEntry) | |
console.log(typeof parsedEntry) | |
let data = JSON.parse(parsedEntry) | |
console.log(data.remote_addr) | |
postDocumentToES(data.remote_addr, context); | |
}); | |
s3Stream.on('error', function() { | |
console.log( | |
'Error getting object "' + key + '" from bucket "' + bucket + '". ' + | |
'Make sure they exist and your bucket is in the same region as this function.'); | |
context.fail(); | |
}); | |
} | |
/* | |
* Add the given document to the ES domain. | |
* If all records are successfully added, indicate success to lambda | |
* (using the "context" parameter). | |
*/ | |
function postDocumentToES(doc, context) { | |
var req = new AWS.HttpRequest(endpoint); | |
req.method = 'POST'; | |
req.path = "/"+esDomain.index+"/"+esDomain.doctype; | |
req.region = esDomain.region; | |
req.body = doc; | |
req.headers['presigned-expires'] = false; | |
req.headers['Content-Type'] = "application/json" | |
req.headers['Host'] = endpoint.host; | |
// Sign the request (Sigv4) | |
var signer = new AWS.Signers.V4(req, 'es'); | |
signer.addAuthorization(creds, new Date()); | |
var send = new AWS.NodeHttpClient(); | |
send.handleRequest(req, null, function(httpResp) { | |
var body = ''; | |
httpResp.on('data', function (chunk) { | |
body += chunk; | |
}); | |
httpResp.on('end', function (chunk) { | |
numDocsAdded ++; | |
console.log(body); | |
if (numDocsAdded === totLogLines) { | |
// Mark lambda success. If not done so, it will be retried. | |
console.log('All ' + numDocsAdded + ' log records added to ES.'); | |
context.succeed(); | |
} | |
}); | |
}, function(err) { | |
console.log('Error: ' + err); | |
console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.'); | |
context.fail(); | |
}); | |
} | |
/* Lambda "main": Execution starts here */ | |
exports.handler = function(event, context) { | |
console.log('Received event: ', JSON.stringify(event, null, 2)); | |
/* == Streams == | |
* To avoid loading an entire (typically large) log file into memory, | |
* this is implemented as a pipeline of filters, streaming log data | |
* from S3 to ES. | |
* Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES | |
*/ | |
var lineStream = new LineStream(); | |
// A stream of log records, from parsing each log line | |
var recordStream = new stream.Transform({objectMode: true}) | |
recordStream._transform = function(line, encoding, done) { | |
var logRecord = parse(line.toString()); | |
var serializedRecord = JSON.stringify(logRecord); | |
this.push(serializedRecord); | |
totLogLines ++; | |
done(); | |
} | |
console.log(JSON.stringify(event)) | |
event.Records.forEach(function(record) { | |
var bucket = record.s3.bucket.name; | |
var objKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' ')); | |
s3LogsToES(bucket, objKey, context, lineStream, recordStream); | |
}); | |
} |
Check the Lambda logs in Cloudwatch for more accurate debugging. I'm not sure why this error is happening from ES.
hi I read your blog on this link https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/
Could you please help to give more guidance relate to Glue and Athena? Im stuck at that step.
Thank you very much!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
HI I have:
"
{
"error": {
"root_cause": [
{
"type": "mapper_parsing_exception",
"reason": "failed to parse"
}
],
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input in VALUE_STRING\n at [Source: org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper@69223f7c; line: 1, column: 1153]"
}
},
"status": 400
}"
when try to use your s3-trigger-handler.js