Skip to content

Instantly share code, notes, and snippets.

@harrigan
Last active December 14, 2015 16:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harrigan/5116118 to your computer and use it in GitHub Desktop.
Save harrigan/5116118 to your computer and use it in GitHub Desktop.
var _ = require('underscore');
var async = require('async');
var assert = require('assert');
var awssum = require('awssum');
var clone = require('clone');
var retry = require('retry');
var util = require('util');
var winston = require('winston');
var env = process.env;
if (typeof process.env.ACCESS_KEY_ID === 'undefined') {
winston.error('You must specify an AWS Access Key ID.');
process.exit(1);
}
if (typeof process.env.SECRET_ACCESS_KEY === 'undefined') {
winston.error('You must specify an AWS Secret Access Key.');
process.exit(1);
}
if (typeof process.env.ACCOUNT_ID === 'undefined') {
winston.error('You must specifiy an AWS Account ID.');
process.exit(1);
}
var accessKeyId = env.ACCESS_KEY_ID;
var secretAccessKey = env.SECRET_ACCESS_KEY;
var awsAccountId = env.ACCOUNT_ID;
var amazon = awssum.load('amazon/amazon');
var DynamoDB = awssum.load('amazon/dynamodb').DynamoDB;
var ddb = new DynamoDB({
'accessKeyId': accessKeyId,
'secretAccessKey': secretAccessKey,
'region': amazon.US_EAST_1,
});
// prefix attributes with S (for String) or N (for Number)
var formatItem = function(item) {
var itemToFormat = clone(item);
_.map(itemToFormat, function(value, key, obj) {
if (_.isString(value)) {
obj[key] = {
S: value
};
} else if (_.isNumber(value)) {
obj[key] = {
N: value.toString(),
};
} else {
delete obj[key];
}
});
return itemToFormat;
};
var importItem = function(formattedItem, tableName, callback) {
if (!_.isNull(formattedItem)) {
pendingItems.RequestItems[tableName].push({
'PutRequest' : {
'Item' : formattedItem,
},
});
}
if (pendingItems.RequestItems[tableName].length === 25 ||
(pendingItems.RequestItems[tableName].length > 0 &&
_.isNull(formattedItem))) {
var operation = retry.operation({
retries: 5,
factor: 3,
minTimeout: 5000,
maxTimeout: 60 * 1000,
randomize: true,
});
operation.attempt(function() {
ddb.BatchWriteItem(pendingItems, function(err, response) {
if (operation.retry(err)) {
winston.warn(util.inspect(err));
winston.warn('Retrying to import the items...');
} else if (err) {
winston.error('The import failed!');
callback(operation.mainError());
} else {
if (_.isUndefined(response.Body.UnprocessedItems[tableName])) {
pendingItems.RequestItems[tableName] = [];
} else {
// we are assuming that there are at most 24 unprocessed items
pendingItems.RequestItems[tableName] =
response.Body.UnprocessedItems[tableName];
}
callback(null);
}
});
});
} else {
callback(null);
}
};
var range2d = function(x, y) {
var values = [];
for (var i = 0; i < x; i++) {
for (var j = 0; j < y; j++) {
values.push([i, j]);
}
}
return values;
}
// pending items are shared between calls to importItem
var pendingItems = {
'RequestItems': {
'myData': [],
},
};
var pendingFunctions = _.map(range2d(10, 100), function(i) {
return function(callback) {
winston.debug('Preparing ' + JSON.stringify(i) + '...');
var formattedItem = formatItem({appId: i[0], ts: i[1]});
importItem(formattedItem, 'myData', callback);
};
});
async.series(pendingFunctions, function(err) {
if (err) {
winston.error(util.inspect(err));
process.exit(1);
}
// flush any remaining pending items
async.whilst(function() {
return pendingItems.RequestItems['myData'].length > 0;
}, function(callback) {
importItem(null, 'myData', callback);
}, function(err) {
if (err) {
winston.error(util.inspect(err));
process.exit(1);
}
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment