Last active
December 14, 2015 16:38
-
-
Save harrigan/5116118 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require('underscore'); | |
var async = require('async'); | |
var assert = require('assert'); | |
var awssum = require('awssum'); | |
var clone = require('clone'); | |
var retry = require('retry'); | |
var util = require('util'); | |
var winston = require('winston'); | |
var env = process.env; | |
if (typeof process.env.ACCESS_KEY_ID === 'undefined') { | |
winston.error('You must specify an AWS Access Key ID.'); | |
process.exit(1); | |
} | |
if (typeof process.env.SECRET_ACCESS_KEY === 'undefined') { | |
winston.error('You must specify an AWS Secret Access Key.'); | |
process.exit(1); | |
} | |
if (typeof process.env.ACCOUNT_ID === 'undefined') { | |
winston.error('You must specifiy an AWS Account ID.'); | |
process.exit(1); | |
} | |
var accessKeyId = env.ACCESS_KEY_ID; | |
var secretAccessKey = env.SECRET_ACCESS_KEY; | |
var awsAccountId = env.ACCOUNT_ID; | |
var amazon = awssum.load('amazon/amazon'); | |
var DynamoDB = awssum.load('amazon/dynamodb').DynamoDB; | |
var ddb = new DynamoDB({ | |
'accessKeyId': accessKeyId, | |
'secretAccessKey': secretAccessKey, | |
'region': amazon.US_EAST_1, | |
}); | |
// prefix attributes with S (for String) or N (for Number) | |
var formatItem = function(item) { | |
var itemToFormat = clone(item); | |
_.map(itemToFormat, function(value, key, obj) { | |
if (_.isString(value)) { | |
obj[key] = { | |
S: value | |
}; | |
} else if (_.isNumber(value)) { | |
obj[key] = { | |
N: value.toString(), | |
}; | |
} else { | |
delete obj[key]; | |
} | |
}); | |
return itemToFormat; | |
}; | |
var importItem = function(formattedItem, tableName, callback) { | |
if (!_.isNull(formattedItem)) { | |
pendingItems.RequestItems[tableName].push({ | |
'PutRequest' : { | |
'Item' : formattedItem, | |
}, | |
}); | |
} | |
if (pendingItems.RequestItems[tableName].length === 25 || | |
(pendingItems.RequestItems[tableName].length > 0 && | |
_.isNull(formattedItem))) { | |
var operation = retry.operation({ | |
retries: 5, | |
factor: 3, | |
minTimeout: 5000, | |
maxTimeout: 60 * 1000, | |
randomize: true, | |
}); | |
operation.attempt(function() { | |
ddb.BatchWriteItem(pendingItems, function(err, response) { | |
if (operation.retry(err)) { | |
winston.warn(util.inspect(err)); | |
winston.warn('Retrying to import the items...'); | |
} else if (err) { | |
winston.error('The import failed!'); | |
callback(operation.mainError()); | |
} else { | |
if (_.isUndefined(response.Body.UnprocessedItems[tableName])) { | |
pendingItems.RequestItems[tableName] = []; | |
} else { | |
// we are assuming that there are at most 24 unprocessed items | |
pendingItems.RequestItems[tableName] = | |
response.Body.UnprocessedItems[tableName]; | |
} | |
callback(null); | |
} | |
}); | |
}); | |
} else { | |
callback(null); | |
} | |
}; | |
var range2d = function(x, y) { | |
var values = []; | |
for (var i = 0; i < x; i++) { | |
for (var j = 0; j < y; j++) { | |
values.push([i, j]); | |
} | |
} | |
return values; | |
} | |
// pending items are shared between calls to importItem | |
var pendingItems = { | |
'RequestItems': { | |
'myData': [], | |
}, | |
}; | |
var pendingFunctions = _.map(range2d(10, 100), function(i) { | |
return function(callback) { | |
winston.debug('Preparing ' + JSON.stringify(i) + '...'); | |
var formattedItem = formatItem({appId: i[0], ts: i[1]}); | |
importItem(formattedItem, 'myData', callback); | |
}; | |
}); | |
async.series(pendingFunctions, function(err) { | |
if (err) { | |
winston.error(util.inspect(err)); | |
process.exit(1); | |
} | |
// flush any remaining pending items | |
async.whilst(function() { | |
return pendingItems.RequestItems['myData'].length > 0; | |
}, function(callback) { | |
importItem(null, 'myData', callback); | |
}, function(err) { | |
if (err) { | |
winston.error(util.inspect(err)); | |
process.exit(1); | |
} | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment