Skip to content

Instantly share code, notes, and snippets.

@ChristianRich
Last active April 26, 2017 07:15
Show Gist options
  • Save ChristianRich/a7e086c76ecd0db1c78ae5e15b11606d to your computer and use it in GitHub Desktop.
Save ChristianRich/a7e086c76ecd0db1c78ae5e15b11606d to your computer and use it in GitHub Desktop.
Reads CSV files and saves them as Sqlite3 tables to a on-disc database
import parse from 'csv-parse';
import fs from 'fs';
import sqlite3 from 'sqlite3';
import async from 'async';
import csvHeaders from 'csv-headers';
import path from 'path';
let count = 0;
/**
* Builds a Sqlite3 database table from a csv file
* It's important to note that with sqlite3 you can only build one table at a time
* @param {string} database - path to database
* @param {string} csvFile - path to csv file
* @param {int} maxRecords - Cap dataset
* @param {string=} delimiter
* @return {*}
*/
module.exports = (database, csvFile, maxRecords = 250, delimiter = ',') => {
++count;
let dbfn = database,
csvfn = csvFile,
tblnm;
// Extract the table name from the filename
try{
tblnm = path.parse(csvfn).name;
} catch(e){
}
return new Promise((resolve, reject) => {
// console.log('#1 Loading file ' + csvfn);
if(!tblnm){
return reject('Unable to determine table name');
}
const opts = {
file: csvfn,
delimiter: delimiter
};
return csvHeaders(opts, (err, headers) => {
if(err){
return reject(err);
}
resolve({headers});
});
})
.then((context) => {
return new Promise((resolve, reject) => {
// console.log('#2 Create new or load existing database ' + dbfn);
const db = new sqlite3.Database(dbfn);
db.on('error', err =>{
reject(err);
});
db.on('open', () => {
context.db = db;
resolve(context);
});
});
})
.then((context) => {
return new Promise((resolve, reject) => {
// console.log(`#3 DROP TABLE IF EXISTS ${tblnm}`);
context.db.run(`DROP TABLE IF EXISTS ${tblnm}`, [], (err) => {
if(err){
console.log(err);
return reject(err);
}
resolve(context);
});
});
})
.then((context) => {
return new Promise((resolve, reject) => {
// console.log('#4 Reading fields');
let fields = '',
fieldnms = '',
qs = '';
context.headers.forEach((hdr) => {
hdr = hdr.replace(' ', '_');
if(fields !== '') fields += ',';
if(fieldnms !== '') fieldnms += ',';
if(qs !== '') qs += ',';
fields += ` ${hdr} TEXT`;
fieldnms += ` ${hdr}`;
qs += ' ?';
});
context.qs = qs;
context.fieldnms = fieldnms;
context.db.run(`CREATE TABLE IF NOT EXISTS ${tblnm} ( ${fields} )`, [], (err) => {
if(err){
console.log(err);
return reject(err);
}
resolve(context);
});
});
})
.then((context) => {
return new Promise((resolve, reject) => {
// console.log('#5 Load and parse CSV');
const opts = {
delimiter: delimiter,
columns: true,
relax_column_count: true
};
fs
.createReadStream(csvfn)
.pipe(parse(opts, (err, data) => {
if(err){
console.log(err + ' in file ' + csvfn);
return resolve();
}
if(data.length > maxRecords){
console.log(`Capping records to ${maxRecords} for table '${tblnm}'`);
data = data.splice(0, maxRecords);
}
console.log(`[${count}] Inserting ${data.length} records into table '${tblnm}'`);
if(data.length === 0){
return resolve(context);
}
async.eachSeries(data, (datum, next) => {
let d = [];
context.headers.forEach((hdr) =>{
d.push(datum[hdr]);
});
// console.log(`INSERT INTO ${tblnm} ( ${context.fieldnms} ) VALUES ( ${context.qs} ) ${d}`)
context.db.run(`INSERT INTO ${tblnm} ( ${context.fieldnms} ) VALUES ( ${context.qs} )`, d, (err) => {
if(err){
console.error(err);
return next(err);
}
next();
});
},
(err) => {
if(err){
return reject(err);
}
resolve(context);
});
}));
});
})
.then((context) => {
return new Promise((resolve) => {
// console.log('#6 close db');
if(context && context.db){
context.db.close();
}
resolve();
})
})
.catch((err) => {
throw err;
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment