Skip to content

Instantly share code, notes, and snippets.

@barrbrain
Created March 15, 2011 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save barrbrain/870730 to your computer and use it in GitHub Desktop.
Save barrbrain/870730 to your computer and use it in GitHub Desktop.
Simple XML record stream processor. Uses node-binary, node-postgres and xml2js-expat.
var net = require('net'),
xml2js = require('xml2js-expat'),
Binary = require('binary'),
pg = require('pg').native,
chainGang = require('chain-gang'),
conString = "tcp://postgres:postgres@localhost/postgres"
workers = 2;
var chain = chainGang.create({workers: workers});
chain.addListener('error', function(name, error) {
})
chain.addListener('finished', function(name, value) {
console.log(name + ' has finished');
})
var insert = function(worker, record) {
// Transform and Load goes here
console.log('INSERT ' + record.name);
worker.db.query('INSERT INTO test values($1)', [record.name], function(err, result) {
var db = worker.db;
worker.db = null;
worker.finish();
});
}
var insertMessage = function(record) {
console.log('CREATE JOB ' + record.name);
return function(worker) {
if (!worker.db) {
console.log('CONNECT DB');
pg.connect(conString, function(err, db) {
if (err) {
console.log(err);
worker.finish();
} else {
worker.db = db;
}
insert(worker, record);
});
} else {
insert(worker, record);
}
};
}
var tcp = net.createConnection(7000);
tcp.addListener("connect", function() {
var count = 0;
Binary.stream(tcp)
.loop(function(end) {
this
.word16lu('l')
.buffer('x', 'l')
.tap(function(vars) {
var parser = new xml2js.Parser();
parser.addListener('end', function(result, error) {
console.log('PARSED RECORD ' + result.name);
if (!error)
chain.add(insertMessage(result), 'insertMessage'+(count++));
else
end(error);
});
parser.parseBuffer(vars.x, true);
});
})
});
tcp.addListener("close", function() {
var i;
var terminate = function(worker) {
if (worker.db)
db.end();
}
for (i=0; i<workers; i++)
chain.add(terminate, 'terminate'+i);
});
<card xmlns="http://businesscard.org">
<name>John Doe</name>
<title>CEO, Widget Inc.</title>
<email>john.doe@widget.com</email>
<phone>(202) 456-1414</phone>
<logo url="widget.gif"/>
</card>
#!/bin/bash
{ for (( i=0; i<10; ++i )); do cat header.bin sample.xml ; done; sleep 1; } | nc -l 7000 &
node load.js
@barrbrain
Copy link
Author

This sample achieves about 700 records/sec with a 206-byte payload of which 8 bytes are inserted, on my 3 year old laptop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment