Skip to content

Instantly share code, notes, and snippets.

@mike-hogan
Last active July 17, 2019 19:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mike-hogan/95990df6261e202896114075f7623bce to your computer and use it in GitHub Desktop.
Save mike-hogan/95990df6261e202896114075f7623bce to your computer and use it in GitHub Desktop.
Nodejs DynamoDB writer with exponential back off
"use strict";
var fs = require('fs');
var path = require('path');
var AWS = require('aws-sdk');
var util = require('util');
var stream = require('readable-stream');
var _ = require('lodash');
function DynamoDbWriter() {
if (!(this instanceof DynamoDbWriter)) {
return new DynamoDbWriter();
}
stream.Writable.call(this, {objectMode: true});
this.maxNumberInBatch = 25;
this.queue = [];
const exists = (filePath) => {
try {
fs.statSync(filePath);
} catch (err) {
if (err.code == 'ENOENT') {
return false;
}
}
return true;
};
const awsJson = path.resolve('config/aws.json');
if (exists(awsJson)) {
AWS.config.loadFromPath(awsJson);
}
AWS.config.update({region: 'eu-west-1'});
this.docClient = new AWS.DynamoDB.DocumentClient();
}
util.inherits(DynamoDbWriter, stream.Writable);
DynamoDbWriter.prototype.toPutRequest = function (item) {
return {
PutRequest: {
Item: item
}
}
};
DynamoDbWriter.prototype.batchWriteToDynamo = function (failureBackoffTime, params, callback) {
const self = this;
self.docClient.batchWrite(params, (err, response) => {
if (err) {
throw err;
}
if (response && !_.isEmpty(response.UnprocessedItems)) {
const repostParams = {
RequestItems: response.UnprocessedItems
};
const f = () => {
return self.batchWriteToDynamo(failureBackoffTime * 2, repostParams, callback);
};
console.log("Dynamo return unprocessed items, sleeping %s ms before retry", failureBackoffTime);
setTimeout(f, failureBackoffTime);
} else {
console.log("Write to dynamo worked cleanly");
self.queue = [];
callback(err, response);
}
});
};
DynamoDbWriter.prototype.flushToDynamo = function (callback) {
var self = this;
const putRequests = _.map(self.queue, self.toPutRequest);
var params = {
RequestItems: {
"Trueput": putRequests
}
};
return self.batchWriteToDynamo(100, params, callback);
};
DynamoDbWriter.prototype.enqueue = function (item, sizeInBytes, callback) {
const self = this;
self.queue.push(item);
if (self.queue.length === self.maxNumberInBatch) {
return self.flushToDynamo(callback);
} else {
return callback();
}
};
DynamoDbWriter.prototype._write = function (data, encoding, callback) {
const self = this;
var sizeInBytes = JSON.stringify(data).length;
return self.enqueue(data, sizeInBytes, callback);
};
module.exports.DynamoDbWriter = DynamoDbWriter;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment