Skip to content

Instantly share code, notes, and snippets.

@dawolf-tty
Created October 30, 2020 22:20
Show Gist options
  • Save dawolf-tty/b3745042d013cc2f0230ca538a3f570f to your computer and use it in GitHub Desktop.
Save dawolf-tty/b3745042d013cc2f0230ca538a3f570f to your computer and use it in GitHub Desktop.
import { DynamoDB } from "aws-sdk";
import * as _ from "lodash";
import { RateLimit } from 'async-sema';
const MAX_BATCH_ITEMS = 25; // AWS hard limit
const MAX_PARALLEL_WRITES = 50;
export async function batchWrite(
documentClient: DynamoDB.DocumentClient,
tableName: string,
requests: DynamoDB.DocumentClient.WriteRequest[]
) {
// Create a semaphore to limit parallel writes
const limit = RateLimit(MAX_PARALLEL_WRITES);
var totWrittenCtr = 0;
return await Promise.all(
_.chunk(requests, MAX_BATCH_ITEMS)
.map(async (chunk) => {
// Wait if the maximum amount of parallel writes has been reached
await limit();
let params = { RequestItems: { [tableName]: chunk } };
try {
let res = await documentClient.batchWrite(params).promise();
if (!_.isEmpty(res.UnprocessedItems)) {
// If there are unprocessed items log them (tipically due to insufficient provisioned throughput)
let chunkWrittenCtr = chunk.length - res.UnprocessedItems[tableName].length;
totWrittenCtr += chunkWrittenCtr;
console.log(`[OK] ${totWrittenCtr}`);
console.log(`[KO] The following ${res.UnprocessedItems[tableName].length} items have NOT been written:`);
res.UnprocessedItems[tableName].forEach(x => console.log("--------" + (x.PutRequest === undefined ? JSON.stringify(x.DeleteRequest) : JSON.stringify(x.PutRequest))));
}
else {
// If the entire chunk has been written just increment the total counter
totWrittenCtr += chunk.length;
console.log(`[OK] ${totWrittenCtr}`);
}
return res;
}
catch (e) {
// If an exception occurs the entire chunck has not been written
console.log(`[KO] The following items (entire chunk) have NOT been written:`);
chunk.forEach(x => console.log("--------" + (x.PutRequest === undefined ? JSON.stringify(x.DeleteRequest) : JSON.stringify(x.PutRequest))));
console.log(`Exception details follow: \n${e}`);
}
})
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment