Created
October 25, 2022 00:38
-
-
Save gtracy/9e327b6e788d664d04eb9937b1d4d1f1 to your computer and use it in GitHub Desktop.
Import CSV file into DynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
// | |
// core logic for importing and transforming a CSV dataset, and | |
// storing the results in a Dynamo table | |
// | |
const Fs = require('fs'); | |
const CsvReadableStream = require('csv-reader'); | |
const prompt = require('prompt-sync')({sigint: true}); | |
let AWS = require('aws-sdk'); | |
let csvToDynamo = async function(AWS,csv_file,dynamo_table,table_params) { | |
let ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'}); | |
let dynamoClient = new AWS.DynamoDB.DocumentClient({apiVersion: '2012-08-10'}); | |
const batch_size = 25; | |
let batch_num = 1; | |
let write_bucket = []; | |
let promises = []; | |
const concurrentRequests = 25; | |
console.log("\ningest the CSV file into DynamoDB."); | |
console.log("... create the table if it doesn't already exist"); | |
let aws_result = await ddb.createTable(table_params, async function(err, data) { | |
if (err) { | |
if( !(err.code === "ResourceInUseException") ) { | |
// totally fine if the table already exists. | |
// otherwise, exit. | |
console.dir(err); | |
process.exit(1); | |
} else { | |
console.error("... table " + dynamo_table + " already exists"); | |
} | |
} else { | |
console.log("Created table. Table description JSON:", JSON.stringify(data, null, 2)); | |
console.log("... take a quick break, and give AWS a chance to create the table ..."); | |
await new Promise(r => setTimeout(r, 5000)); | |
} | |
console.log(`\n... reading CSV data from ${csv_file}`); | |
console.log("... this task of writing to Dynamo may take a little while!"); | |
let inputStream = Fs.createReadStream(csv_file, 'utf8'); | |
let csvStream = new CsvReadableStream({ skipHeader: true, asObject: true, trim: true }); | |
inputStream | |
.pipe(csvStream) | |
.on('end', async function () { | |
if (promises.length > 0) { | |
console.log('... awaiting write to DynamoDB\n') | |
await Promise.all(promises); | |
} | |
// flush any remnents | |
if (write_bucket.length > 0) { | |
console.log('... found sme remnents. saving these now.'); | |
await saveToDynamoDB(write_bucket); | |
} | |
console.log('No more rows!'); | |
}) | |
.on('error', (err) => { | |
console.error('Failed to read the input file ' + csv_file); | |
console.dir(err); | |
}) | |
.on('data', async function (row) { | |
/***************************************/ | |
// data transformations go here | |
// | |
console.dir(row); | |
/***************************************/ | |
// push the data row into our batch write bucket | |
write_bucket.push(row); | |
// if it is valid, push it into our write bucket | |
if (write_bucket.length % batch_size === 0) { | |
console.log(` batch ${batch_num}`) | |
csvStream.pause(); | |
promises.push(saveToDynamoDB(write_bucket)); | |
if (promises.length % concurrentRequests === 0) { | |
console.log('... awaiting write requests to DynamoDB\n'); | |
await Promise.all(promises); | |
promises = []; | |
} | |
write_bucket = []; | |
batch_num++; | |
csvStream.resume(); | |
} | |
}); | |
}); | |
async function saveToDynamoDB(batch) { | |
const putReqs = batch.map(item => ({ | |
PutRequest: { | |
Item: item | |
} | |
})) | |
const req = { | |
RequestItems: { | |
[dynamo_table]: putReqs | |
} | |
} | |
await dynamoClient.batchWrite(req).promise(); | |
console.log(' batch of ' + batch.length + ' written to dynamo'); | |
} | |
}; | |
(async () => { | |
let aws_config = {}; | |
if( process.env.NODE_ENV === 'prod' ) { | |
aws_config = { | |
region : 'us-east-2', | |
access_id : process.env.AWS_ACCESS_ID, | |
access_secret : process.env.AWS_ACCESS_SECRET | |
} | |
} else { | |
aws_config = { | |
region : 'local', | |
endpoint : 'http://localhost:8000' | |
} | |
} | |
AWS.config.update(aws_config); | |
console.log("\nAWS config :"); | |
console.dir(aws_config); | |
let table_name = prompt('enter the dynamo table name: '); | |
var params = { | |
TableName : table_name, | |
KeySchema: [ | |
{ AttributeName: "FIXME", KeyType: "HASH"}, | |
], | |
AttributeDefinitions: [ | |
{ AttributeName: "FIXME", AttributeType: "S" }, | |
], | |
ProvisionedThroughput: { | |
ReadCapacityUnits: 10, | |
WriteCapacityUnits: 25 | |
} | |
}; | |
// import the GTFS file into our new table | |
try { | |
let csv_input = prompt('enter the CSV input file: '); | |
await csvToDynamo(AWS,csv_input,table_name,params); | |
} catch(err) { | |
console.log('import failed!'); | |
console.log(err); | |
process.exit(-1); | |
} | |
console.log("\nall done.\n"); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment