Skip to content

Instantly share code, notes, and snippets.

@park-brian
Last active July 19, 2021 23:49
Show Gist options
  • Save park-brian/e5b9ed2c6a12639b5b02fc8951d91908 to your computer and use it in GitHub Desktop.
Save park-brian/e5b9ed2c6a12639b5b02fc8951d91908 to your computer and use it in GitHub Desktop.
Uses parallel batchWrite requests to quickly load data into DynamoDB.
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function insertDynamoDBItems(documentClient, tableName, items) {
if (!items.length) return;
let responses = [];
let response = await documentClient
.batchWrite({
RequestItems: {
[tableName]: items.map((Item) => ({
PutRequest: { Item },
})),
},
ReturnConsumedCapacity: 'TOTAL'
})
.promise();
responses.push(response)
// attempt to reinsert unprocessed items (exponential backoff)
let backoff = 100;
while (response.UnprocessedItems?.length) {
console.warn(`Batch write did not succeed, retrying in ${backoff}ms`);
await sleep(backoff);
response = await documentClient.batchWrite(response.UnprocessedItems).promise();
responses.push(response)
backoff *= 2;
}
return responses;
}
async function importDynamoDBTable(documentClient, tableName, items) {
const batchSize = 25;
const maxBatches = 100; // insert 2500 records at a time
let batch = [];
let batches = [];
let count = 0;
async function insertBatches() {
count += batches.map(batch => batch.length).reduce((a, b) => a + b);
const response = await Promise.all(
batches.map(batch =>
insertDynamoDBItems(documentClient, tableName, batch)
)
);
batches = [];
return response;
}
// remove await if not using an async iterator
for await (const item of items) {
batch.push(item);
if (batch.length >= batchSize) {
batches.push(batch);
batch = [];
if (batches.length >= maxBatches) {
await insertBatches();
}
}
}
// insert remaining records
batches.push(batch);
await insertBatches();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment