Skip to content

Instantly share code, notes, and snippets.

@shahzadmasud
Forked from fzakaria/lambda_cloudsearch.js
Created January 7, 2016 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shahzadmasud/ead564374b8c288a19bd to your computer and use it in GitHub Desktop.
Save shahzadmasud/ead564374b8c288a19bd to your computer and use it in GitHub Desktop.
Send CloudTrail events to CloudSearch with AWS Lambda
console.log('Loading event');
var CLOUDSEARCH_ENDPOINT = < INSERT HERE >
var async = require('async');
var jpath = require('json-path')
var zlib = require('zlib');
var aws = require('aws-sdk');
var s3 = new aws.S3({
apiVersion: '2006-03-01'
});
var csd = new aws.CloudSearchDomain({
endpoint: CLOUDSEARCH_ENDPOINT,
apiVersion: '2013-01-01'
});
//These mappings use json-path
//https://github.com/flitbit/json-path
MAPPING = {
"aws_region": "#/awsRegion",
"error_message": "#/errorMessage",
"event_id": "#/eventID",
"event_name": "#/eventName",
"event_source": "#/eventSource",
"event_time": "#/eventTime",
"source_ip_address": "#/sourceIPAddress",
"user_agent": "#/userAgent",
"user_identity_type": "#/userIdentity/type",
"user_identity_arn": "#/userIdentity/arn",
"user_identity_account_id": "#/userIdentity/accountId",
"user_identity_user_name": "#/userIdentity/userName",
}
//http://docs.aws.amazon.com/cloudsearch/latest/developerguide/preparing-data.html
function create_cs_request(id, fields) {
request = {};
request['type'] = 'add';
request['id'] = id;
request['fields'] = fields
return request;
}
function get_s3_gz_json(bucket, key, cb) {
async.waterfall([
//get json.gz
function(callback) {
s3.getObject({
Bucket: bucket,
Key: key
}, function(err, data) {
console.log("Finished collecting S3 Object");
callback(err, data.Body);
});
},
//gunzip the s3 object
function(gz_json, callback) {
zlib.gunzip(gz_json, function(err, dezipped) {
var json_string = dezipped.toString('utf-8');
var json = JSON.parse(json_string);
callback(err, json);
});
},
//get the records
function(json, callback) {
records = jpath.resolve(json, "#/Records[*]")
console.log("Found the following records", records);
callback(null, records);
},
], function(err, result) {
cb(err, result)
});
}
function download_records(records, callback) {
async.concat(records,
function(item, cb) {
fields = {};
for (var prop in MAPPING) {
ct_field_name = MAPPING[prop];
ct_field_value = jpath.resolve(item, ct_field_name)[0] //jpath always returns a list!
fields[prop] = ct_field_value;
}
cs_request = create_cs_request(fields["event_id"], fields);
console.log("created request", cs_request);
cb(null, cs_request);
},
function(err, record_requests) {
callback(err, record_requests);
});
};
function send_record_requests(requests, callback) {
console.log("Publishing the following documents", requests);
var params = {
contentType: 'application/json',
documents: JSON.stringify(requests)
}
csd.uploadDocuments(params, function(err, data) {
callback(err);
});
};
exports.handler = function(event, context) {
console.log('Received event:');
console.log(JSON.stringify(event, null, ' '));
// Get the object from the event and show its content type
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
var perform_task = async.compose(send_record_requests, download_records, get_s3_gz_json);
perform_task(bucket, key, function(err, result) {
if (err) {
context.done("Error performing task: " + err);
} else {
context.done(null, '');
}
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment