Created
March 16, 2017 05:25
-
-
Save wbchn/eb548c68bcfd1f71caf43029b61d08d3 to your computer and use it in GitHub Desktop.
Dynamodb Stream to Elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
https://aws.amazon.com/blogs/aws/amazon-elasticsearch-service-support-for-es-5-1/ | |
*/ | |
var AWS = require('aws-sdk'); | |
var path = require('path'); | |
//Object for all the ElasticSearch Domain Info | |
var esDomain = { | |
region: process.env.RegionForES, | |
endpoint: process.env.EndpointForES, | |
index: process.env.IndexForES, | |
doctype: 'onboardingrecords' | |
}; | |
//AWS Endpoint from created ES Domain Endpoint | |
var endpoint = new AWS.Endpoint(esDomain.endpoint); | |
//The AWS credentials are picked up from the environment. | |
var creds = new AWS.EnvironmentCredentials('AWS'); | |
console.log('Loading function'); | |
exports.handler = (event, context, callback) => { | |
//console.log('Received event:', JSON.stringify(event, null, 2)); | |
console.log(JSON.stringify(esDomain)); | |
event.Records.forEach((record) => { | |
console.log(record.eventID); | |
console.log(record.eventName); | |
console.log('DynamoDB Record: %j', record.dynamodb); | |
var dbRecord = JSON.stringify(record.dynamodb); | |
postToES(dbRecord, context, callback); | |
}); | |
}; | |
function postToES(doc, context, lambdaCallback) { | |
var req = new AWS.HttpRequest(endpoint); | |
req.method = 'POST'; | |
req.path = path.join('/', esDomain.index, esDomain.doctype); | |
req.region = esDomain.region; | |
req.headers['presigned-expires'] = false; | |
req.headers['Host'] = endpoint.host; | |
req.body = doc; | |
var signer = new AWS.Signers.V4(req , 'es'); // es: service code | |
signer.addAuthorization(creds, new Date()); | |
var send = new AWS.NodeHttpClient(); | |
send.handleRequest(req, null, function(httpResp) { | |
var respBody = ''; | |
httpResp.on('data', function (chunk) { | |
respBody += chunk; | |
}); | |
httpResp.on('end', function (chunk) { | |
console.log('Response: ' + respBody); | |
lambdaCallback(null,'Lambda added document ' + doc); | |
}); | |
}, function(err) { | |
console.log('Error: ' + err); | |
lambdaCallback('Lambda failed with error ' + err); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment