Skip to content

Instantly share code, notes, and snippets.

@gtracy
Created October 25, 2022 00:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gtracy/9e327b6e788d664d04eb9937b1d4d1f1 to your computer and use it in GitHub Desktop.
Save gtracy/9e327b6e788d664d04eb9937b1d4d1f1 to your computer and use it in GitHub Desktop.
Import CSV file into DynamoDB
'use strict';
//
// core logic for importing and transforming a CSV dataset, and
// storing the results in a Dynamo table
//
const Fs = require('fs');
const CsvReadableStream = require('csv-reader');
const prompt = require('prompt-sync')({sigint: true});
let AWS = require('aws-sdk');
let csvToDynamo = async function(AWS,csv_file,dynamo_table,table_params) {
let ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
let dynamoClient = new AWS.DynamoDB.DocumentClient({apiVersion: '2012-08-10'});
const batch_size = 25;
let batch_num = 1;
let write_bucket = [];
let promises = [];
const concurrentRequests = 25;
console.log("\ningest the CSV file into DynamoDB.");
console.log("... create the table if it doesn't already exist");
let aws_result = await ddb.createTable(table_params, async function(err, data) {
if (err) {
if( !(err.code === "ResourceInUseException") ) {
// totally fine if the table already exists.
// otherwise, exit.
console.dir(err);
process.exit(1);
} else {
console.error("... table " + dynamo_table + " already exists");
}
} else {
console.log("Created table. Table description JSON:", JSON.stringify(data, null, 2));
console.log("... take a quick break, and give AWS a chance to create the table ...");
await new Promise(r => setTimeout(r, 5000));
}
console.log(`\n... reading CSV data from ${csv_file}`);
console.log("... this task of writing to Dynamo may take a little while!");
let inputStream = Fs.createReadStream(csv_file, 'utf8');
let csvStream = new CsvReadableStream({ skipHeader: true, asObject: true, trim: true });
inputStream
.pipe(csvStream)
.on('end', async function () {
if (promises.length > 0) {
console.log('... awaiting write to DynamoDB\n')
await Promise.all(promises);
}
// flush any remnents
if (write_bucket.length > 0) {
console.log('... found sme remnents. saving these now.');
await saveToDynamoDB(write_bucket);
}
console.log('No more rows!');
})
.on('error', (err) => {
console.error('Failed to read the input file ' + csv_file);
console.dir(err);
})
.on('data', async function (row) {
/***************************************/
// data transformations go here
//
console.dir(row);
/***************************************/
// push the data row into our batch write bucket
write_bucket.push(row);
// if it is valid, push it into our write bucket
if (write_bucket.length % batch_size === 0) {
console.log(` batch ${batch_num}`)
csvStream.pause();
promises.push(saveToDynamoDB(write_bucket));
if (promises.length % concurrentRequests === 0) {
console.log('... awaiting write requests to DynamoDB\n');
await Promise.all(promises);
promises = [];
}
write_bucket = [];
batch_num++;
csvStream.resume();
}
});
});
async function saveToDynamoDB(batch) {
const putReqs = batch.map(item => ({
PutRequest: {
Item: item
}
}))
const req = {
RequestItems: {
[dynamo_table]: putReqs
}
}
await dynamoClient.batchWrite(req).promise();
console.log(' batch of ' + batch.length + ' written to dynamo');
}
};
(async () => {
let aws_config = {};
if( process.env.NODE_ENV === 'prod' ) {
aws_config = {
region : 'us-east-2',
access_id : process.env.AWS_ACCESS_ID,
access_secret : process.env.AWS_ACCESS_SECRET
}
} else {
aws_config = {
region : 'local',
endpoint : 'http://localhost:8000'
}
}
AWS.config.update(aws_config);
console.log("\nAWS config :");
console.dir(aws_config);
let table_name = prompt('enter the dynamo table name: ');
var params = {
TableName : table_name,
KeySchema: [
{ AttributeName: "FIXME", KeyType: "HASH"},
],
AttributeDefinitions: [
{ AttributeName: "FIXME", AttributeType: "S" },
],
ProvisionedThroughput: {
ReadCapacityUnits: 10,
WriteCapacityUnits: 25
}
};
// import the GTFS file into our new table
try {
let csv_input = prompt('enter the CSV input file: ');
await csvToDynamo(AWS,csv_input,table_name,params);
} catch(err) {
console.log('import failed!');
console.log(err);
process.exit(-1);
}
console.log("\nall done.\n");
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment