Created
August 14, 2020 11:38
-
-
Save adamfortuno/41ace4a7a80ac5fe394290cd6e5f5c47 to your computer and use it in GitHub Desktop.
Data Load to DynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
/* | |
.Synopsis | |
Import Data from File to DynamoDB Table | |
.Description | |
Imports data from a file expressed as JSON to a | |
DynamoDB table. | |
.Parameter TableName | |
The name of the DynamoDB table recieving data from | |
the import file. | |
.Parameter AWSRegion | |
The AWS Region hosting the target DynamoDB table. | |
.Parameter AWSProfile | |
The name of the AWS CLI Profile containing the | |
AWS account's connection credentials. | |
.Parameter PathToImportFile | |
The full or relative path to the import file. | |
.Parameter ForceInsert | |
Directs the script to insert data even if the table has data. | |
By the default, this parameter is set to false, which means | |
the script won't insert data if the table already has data. | |
.Example | |
node --max-old-space-size=8192 .\data.load.js 'Foobar' 'us-west-2' 'eng' '.\foobar.development.json' false | |
*/ | |
if (process.argv.length < 6) { | |
throw new Error ('Please pass the table-name, aws-Region, aws-Profile, and file-path to the script.'); | |
} | |
let [, , TableName, Region, Profile, ImportFile, Force] = process.argv; | |
process.env.AWS_SDK_LOAD_CONFIG = true; | |
process.env.AWS_PROFILE = Profile; | |
Force = typeof(Force) !== 'undefined' ? Force : false; | |
const AWS = require('aws-sdk'); | |
const fs = require('fs'); | |
const JSONStream = require('JSONStream'); | |
AWS.config.update({ region: Region }); | |
const ddbc = new AWS.DynamoDB.DocumentClient(); | |
console.log('Target Profile: ', Profile); | |
console.log('Target Region: ', Region); | |
console.log('Target Table: ', TableName); | |
console.log('Source File: ', ImportFile); | |
console.log('Force Import: ', Force); | |
// Returns the number of records in a specified table | |
const ddb_table_has_items = (TableName) => { | |
return new Promise((resolve, reject) => { | |
const ddb_query_parameters = { TableName, Select: 'COUNT' } | |
ddbc.scan(ddb_query_parameters, (error, data) => { | |
(error) ? reject(error) : resolve(data); | |
}); | |
}); | |
} | |
const ddb_table_upsert_items = (TableName, Item) => { | |
return new Promise((resolve, reject) => { | |
const ddb_insert_payload = { TableName, Item }; | |
ddbc.put(ddb_insert_payload, (error, data) => { | |
(error) ? reject(error) : resolve(data); | |
}); | |
}); | |
} | |
const ddb_bulk_load = (TableName, ImportFile) => { | |
return new Promise ( (resolve, reject) => { | |
let count_succeeded = 0; | |
let count_failed = 0; | |
let count_attempted = 0; | |
let inserts = []; | |
const json_stream = JSONStream.parse( "*" ); | |
const source_data_stream = fs.createReadStream(ImportFile); | |
const ddb_source_item = source_data_stream.pipe(json_stream); | |
ddb_source_item.on("data", (source_data_item) => { | |
count_attempted++; | |
let ddb_insert = ddb_table_upsert_items(TableName, source_data_item) | |
.then( (data) => count_succeeded++ ) | |
.catch( (error) => { | |
count_failed++; | |
console.log(JSON.stringify(error)); | |
}); | |
inserts.push(ddb_insert); | |
}); | |
ddb_source_item.on("end", () => { | |
Promise.all(inserts) | |
.then(() => { | |
resolve({count_succeeded, count_failed, count_attempted}); | |
}) | |
.catch((error) => { | |
console.log(error); | |
reject(error); | |
}); | |
}); | |
ddb_source_item.on("error", (error) => { | |
reject(error); | |
}); | |
}); | |
} | |
(async () => { | |
try { | |
let proceed_with_import = false; | |
if ( Force.toString().toLowerCase() === 'true' ) { | |
proceed_with_import = true; | |
} else { | |
const table_scan = await ddb_table_has_items(TableName); | |
proceed_with_import = ( table_scan.Count === 0 ); | |
} | |
if (proceed_with_import) { | |
let ddb_inserts = await ddb_bulk_load(TableName, ImportFile); | |
console.log("=".repeat(75)); | |
console.log("Completed: '%s' has been loaded into '%s'.", ImportFile, TableName); | |
console.log(" Insert Attempted: %s", ddb_inserts.count_attempted); | |
console.log(" Insert Succeeded: %s", ddb_inserts.count_succeeded); | |
console.log(" Insert Failed : %s", ddb_inserts.count_failed); | |
} | |
} catch (error) { | |
console.log(error); | |
} | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment