Skip to content

Instantly share code, notes, and snippets.

Last active March 13, 2024 19:15
Show Gist options
  • Save fzakaria/4f93a8dbf483695fb7d5 to your computer and use it in GitHub Desktop.
Save fzakaria/4f93a8dbf483695fb7d5 to your computer and use it in GitHub Desktop.
Send CloudTrail events to CloudSearch with AWS Lambda
console.log('Loading event');
var async = require('async');
var jpath = require('json-path')
var zlib = require('zlib');
var aws = require('aws-sdk');
var s3 = new aws.S3({
apiVersion: '2006-03-01'
var csd = new aws.CloudSearchDomain({
apiVersion: '2013-01-01'
//These mappings use json-path
"aws_region": "#/awsRegion",
"error_message": "#/errorMessage",
"event_id": "#/eventID",
"event_name": "#/eventName",
"event_source": "#/eventSource",
"event_time": "#/eventTime",
"source_ip_address": "#/sourceIPAddress",
"user_agent": "#/userAgent",
"user_identity_type": "#/userIdentity/type",
"user_identity_arn": "#/userIdentity/arn",
"user_identity_account_id": "#/userIdentity/accountId",
"user_identity_user_name": "#/userIdentity/userName",
function create_cs_request(id, fields) {
request = {};
request['type'] = 'add';
request['id'] = id;
request['fields'] = fields
return request;
function get_s3_gz_json(bucket, key, cb) {
//get json.gz
function(callback) {
Bucket: bucket,
Key: key
}, function(err, data) {
console.log("Finished collecting S3 Object");
callback(err, data.Body);
//gunzip the s3 object
function(gz_json, callback) {
zlib.gunzip(gz_json, function(err, dezipped) {
var json_string = dezipped.toString('utf-8');
var json = JSON.parse(json_string);
callback(err, json);
//get the records
function(json, callback) {
records = jpath.resolve(json, "#/Records[*]")
console.log("Found the following records", records);
callback(null, records);
], function(err, result) {
cb(err, result)
function download_records(records, callback) {
function(item, cb) {
fields = {};
for (var prop in MAPPING) {
ct_field_name = MAPPING[prop];
ct_field_value = jpath.resolve(item, ct_field_name)[0] //jpath always returns a list!
fields[prop] = ct_field_value;
cs_request = create_cs_request(fields["event_id"], fields);
console.log("created request", cs_request);
cb(null, cs_request);
function(err, record_requests) {
callback(err, record_requests);
function send_record_requests(requests, callback) {
console.log("Publishing the following documents", requests);
var params = {
contentType: 'application/json',
documents: JSON.stringify(requests)
csd.uploadDocuments(params, function(err, data) {
exports.handler = function(event, context) {
console.log('Received event:');
console.log(JSON.stringify(event, null, ' '));
// Get the object from the event and show its content type
var bucket = event.Records[0];
var key = event.Records[0].s3.object.key;
var perform_task = async.compose(send_record_requests, download_records, get_s3_gz_json);
perform_task(bucket, key, function(err, result) {
if (err) {
context.done("Error performing task: " + err);
} else {
context.done(null, '');
Copy link

Is "error_message": "#/errorMessage", really in the record?

Copy link

reetu40 commented Mar 13, 2024

@shahzadmasud Did you use AWS Cloud Search for that?I have a scenario where over 90 million documents in DynamoDb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment