This is a simple AWS Lambda function to syncronize data from DynamoDB to ElasticSearch, it is triggered via DynamoDB streams. Also transforms data types.
const fetch = require('node-fetch');
const es_url = "https://elasticsearch.us-east-1.es.amazonaws.com";
exports.handler = async (event) => {
if (event.Records !== undefined && event.Records.length > 0) {
for (let record of event.Records) {
if (record.eventName === "INSERT" || record.eventName === "MODIFY") {
const newImage = record.dynamodb.NewImage;
const location = newImage.location;
await fetch(`${es_url}/index/marker/${newImage.markerId.S}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
map_id: newImage.mapId.N,
user_posted: newImage.userPosted.N,
date_posted: newImage.datePosted.S,
reaction_total_verify: newImage.reactionTotalVerify.N,
reaction_total_notexist: newImage.reactionTotalNotExist.N,
location: {
lat: location.M.lat.N,
lon: location.M.lon.N
}
})
})
.catch(err => console.error(err));
}
else if (record.eventName === "REMOVE") {
const markerId = record.dynamodb.Keys.markerId.S;
await fetch(`${es_url}/index/marker/${markerId}`, {
method: "DELETE"
})
.catch(err => console.error(err));
}
}
}
const response = {
statusCode: 200,
body: JSON.stringify('ok'),
};
return response;
};