Skip to content

Instantly share code, notes, and snippets.

@simg
Last active August 29, 2015 14:20
Show Gist options
  • Save simg/055ea65685633cf1505e to your computer and use it in GitHub Desktop.
Save simg/055ea65685633cf1505e to your computer and use it in GitHub Desktop.
Non-blocking single threaded data import from xml to postgresql "template" using node.js
/*
This is a simple node program originally written to import a large volume of data
stored in XML files and import it into a postgres database.
It uses the async library to implement paralell processing with a concurrency of 20.
Adjusting the concurrency can alter the performance depending on the machine and the
exact workload.
The program is single threaded, so for multi-core machines you might want to consider
using the cluster.js library to implement multi-threading for further increases in
throughput.
Further improvements are probably possible by batching inputs to the database.
ie rather than 1 insert per file, process multiple files and batch import a few hundred
rows at a time?
I have had to remove proprietary code that does the actual conversion from XML to
SQL so the program in it's current form doesn't actually do anything.
You would need to add your own code in the relevant places.
*/
// include libaries
var fs = require("fs");
var cheerio = require('cheerio'); //used for xml / xhtml parsing
var S = require("string");
var pg = require('pg');
var async = require('async');
var moment = require('moment');
// Configuration
var input_dir = '/wherever/to_import';
var output_dir = '/somewherelse/imported';
var conString = "postgres://postgres:db_password@localhost/db_user";
var debug = false;
// Set up database connection
var client = new pg.Client(conString);
client.connect();
// initialise variables
var progress_counter = 0;
var start_time = moment();
var timer = null; // used to calculate etas / performance
// get list of files to process
var filelist = fs.readdirSync(input_dir);
// if in debug mode, only process the first 2 files
if (debug) filelist = filelist.slice(1,2);
// kick off processing in parallel with a concurrency of 20
async.eachLimit(filelist, 20, processFile, final);
// process each input file
function processFile(filename, next) {
//main processing sequence for each file
async.waterfall([
function(callback) {
readFile(filename, callback)
},
extractData,
validateData,
updateDatabase,
moveFile
],
function(err, filename) {
// this function called if any of the async.waterfall functions return an error
// processing continues with the next file in the list
if (err) {
console.log("error processing "+filename, err);
} else {
if (debug) console.log("file processed succesfully", filename);
}
next();
});
// display a progress counter
if (progress_counter++ % 1000 == 0) {
var msg;
if (timer) {
var time_per_thou = moment().diff(timer);
var remaining = (filelist.length - progress_counter)/1000;
var files_per_minute = parseInt((1000 / (time_per_thou / 60000)));
msg = "eta "+ moment.duration(time_per_thou * remaining).humanize(true) + " avg: "+files_per_minute+" files/minute";
} else {
// first time through the loop
msg = "Import started at "+moment().format("dddd, MMMM Do YYYY, h:mm:ss a");
}
timer = moment();
console.log(progress_counter + " of " + filelist.length + " : "+ msg);
}
};
// called at end of processing for clean up / summary
function final() {
//close database connection
client.end();
// generate summary
var total_time = moment().diff(start_time);
var files_per_minute = parseInt(filelist.length / (total_time/60000));
console.log(progress_counter +" files imported in "+moment.duration(total_time).humanize(true)+" avg: "+files_per_minute+" files/minute");
}
// read input file (TODO: tihs function is a bit superflous)
function readFile(filename, next) {
if (debug) console.log("read file " + input_dir+'/'+filename);
fs.readFile(input_dir+'/'+filename, "utf8", function(err, data){
next(err, filename, data)
});
}
// handle extraction of data from input file (add your code here)
function extractData(filename, data, next) {
if (debug) console.log("extracting data " + filename);
var $ = cheerio.load(data, {xmlmode:true});
var result = {};
result.field1 = $("Field1").first().text() || null;
result.field2 = $("Field2").first().text() || null;
result.field3 = $("Field3").first().text()
//etc
next(null, filename, result);
}
// validate the extracted data
function validateData(filename, json, next) {
var err = null;
if (!json.whatever) {
err = "whatever not valid"
}
if (err) err += " in "+filename;
next(err, filename, json)
}
// add the data to the database
function updateDatabase(filename, data, next) {
var cols = [], values = [], placeholders = [], query;
var i = 1;
for (d in data) {
cols.push(d);
values.push(data[d])
placeholders.push("$"+i++);
}
// generate SQL command
query = 'INSERT INTO my_table ("'+cols.join('","')+'") values('+placeholders.join(',')+')';
query += ' RETURNING id"'
client.query({
name:"importdata",
text:query,
values:values
},function(err, result) {
if (debug && err) console.log("values", data)
next(err, filename)
});
}
// move processed files to another folder to prevent re-processing
function moveFile(filename, next) {
fs.rename(input_dir+'/'+filename, output_dir+'/'+filename, function(err) {
next(null, filename);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment