Skip to content

Instantly share code, notes, and snippets.

@Sandyman
Created May 20, 2017 06:04
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sandyman/6bd342264b79838ae38958a7e08283b7 to your computer and use it in GitHub Desktop.
Save Sandyman/6bd342264b79838ae38958a7e08283b7 to your computer and use it in GitHub Desktop.
Index Elasticsearch documents streaming from DynamoDB
'use strict';
const Elasticsearch = require('aws-es');
const fetch = require('node-fetch');
const ES_SERVICE_ENDPOINT = process.env.ES_SERVICE_ENDPOINT;
const ES_INDEX_NAME = process.env.ES_DOMAIN_NAME;
const accessKey = process.env.ACCESS_KEY;
const secretKey = process.env.SECRET_ACCESS_KEY;
const es = new Elasticsearch({
accessKeyId: accessKey,
secretAccessKey: secretKey,
service: 'es',
region: 'us-east-1',
host: ES_SERVICE_ENDPOINT
});
/**
* Get cafe related information from the image object (DynamoDB JSON syntax)
* @param data
* @param image
*/
const parseCafe = (data, image) => {
const blends = [];
const beans = [];
if (image.hasOwnProperty('beans')) {
image.beans.L.map(function(bean) {
const beanObj = bean.M;
beans.push(beanObj.name.S);
if (beanObj.hasOwnProperty('blends')) {
beanObj.blends.L.map(function(blend) {
blends.push(blend.M.name.S);
});
}
});
}
// Convert all fields from DynamoDB json syntax
data.name = image.name.S;
data.beans = beans;
data.blends = blends;
/* other fields go here, but you get the gist */
};
/**
* Get coffee related information from the image object (DynamoDB JSON syntax)
* @param data
* @param image
*/
const parseCoffee = (data, image) => {
const blends = [];
if (image.hasOwnProperty('blends')) {
image.blends.L.map(function(blend) {
blends.push(blend.M.name.S);
});
}
// Convert all fields from DynamoDB json syntax
data.name = image.name.S;
data.blends = blends;
};
const parsers = {
'CafesTable-dev': parseCafe,
'CoffeesTable-dev': parseCoffee
};
/**
* Update a single search document
* @param record
* @returns {Promise}
*/
const updateDocument = (record) => new Promise((resolve, reject) => {
// Get table name from the event source ARN, which determines the type
const tableName = /^[^\/]+\/([^\/]+)\/.*$/.exec(record.eventSourceARN)[1];
const type = tableName === 'CafesTable-dev' ? 'cc' : 'bb';
const id = record.dynamodb.Keys.id.S;
const options = {
index: ES_INDEX_NAME,
type: type,
id: id
};
if (record.eventName === 'REMOVE') {
// Document to be removed from index
es.delete(options, (err, data) => {
if (err) return reject(err);
else return resolve(data);
});
} else {
// New or updated document to be added/updated
const data = {id : id};
// Parse the record image based on the table name and update data
parsers[tableName](data, record.dynamodb.NewImage);
// data will have been updated by the parser
options.body = data;
es.index(options, (err, data) => {
if (err) return reject(err);
else return resolve(data);
})
}
});
/**
* Main service handler - triggered from a DynamoDB stream event
* @param event
* @param context
* @param callback
*/
module.exports.update = (event, context, callback) => {
console.log(JSON.stringify(event, null, 3));
// updateDocument() returns a promise
const promises = event.Records.map(record => updateDocument(record));
Promise.all(promises)
.then(r => {
// We log the result of every document
r.forEach(x => console.log(JSON.stringify(x, null, 3)));
return callback(null, 'All good');
})
.catch(err => {
// Log error if something goes wrong
console.log("Something went wrong: " + err.message);
return callback(null, err.message)
});
};
@zoellner
Copy link

zoellner commented Feb 5, 2019

tip: check AWS.DynamoDB.Converter.unmarshall from the aws-sdk if you don't want to deal with the dynamodb json format. It converts a DynamoDB record into a JavaScript object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment