Created
March 15, 2011 13:46
-
-
Save barrbrain/870730 to your computer and use it in GitHub Desktop.
Simple XML record stream processor. Uses node-binary, node-postgres and xml2js-expat.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'), | |
xml2js = require('xml2js-expat'), | |
Binary = require('binary'), | |
pg = require('pg').native, | |
chainGang = require('chain-gang'), | |
conString = "tcp://postgres:postgres@localhost/postgres" | |
workers = 2; | |
var chain = chainGang.create({workers: workers}); | |
chain.addListener('error', function(name, error) { | |
}) | |
chain.addListener('finished', function(name, value) { | |
console.log(name + ' has finished'); | |
}) | |
var insert = function(worker, record) { | |
// Transform and Load goes here | |
console.log('INSERT ' + record.name); | |
worker.db.query('INSERT INTO test values($1)', [record.name], function(err, result) { | |
var db = worker.db; | |
worker.db = null; | |
worker.finish(); | |
}); | |
} | |
var insertMessage = function(record) { | |
console.log('CREATE JOB ' + record.name); | |
return function(worker) { | |
if (!worker.db) { | |
console.log('CONNECT DB'); | |
pg.connect(conString, function(err, db) { | |
if (err) { | |
console.log(err); | |
worker.finish(); | |
} else { | |
worker.db = db; | |
} | |
insert(worker, record); | |
}); | |
} else { | |
insert(worker, record); | |
} | |
}; | |
} | |
var tcp = net.createConnection(7000); | |
tcp.addListener("connect", function() { | |
var count = 0; | |
Binary.stream(tcp) | |
.loop(function(end) { | |
this | |
.word16lu('l') | |
.buffer('x', 'l') | |
.tap(function(vars) { | |
var parser = new xml2js.Parser(); | |
parser.addListener('end', function(result, error) { | |
console.log('PARSED RECORD ' + result.name); | |
if (!error) | |
chain.add(insertMessage(result), 'insertMessage'+(count++)); | |
else | |
end(error); | |
}); | |
parser.parseBuffer(vars.x, true); | |
}); | |
}) | |
}); | |
tcp.addListener("close", function() { | |
var i; | |
var terminate = function(worker) { | |
if (worker.db) | |
db.end(); | |
} | |
for (i=0; i<workers; i++) | |
chain.add(terminate, 'terminate'+i); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<card xmlns="http://businesscard.org"> | |
<name>John Doe</name> | |
<title>CEO, Widget Inc.</title> | |
<email>john.doe@widget.com</email> | |
<phone>(202) 456-1414</phone> | |
<logo url="widget.gif"/> | |
</card> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
{ for (( i=0; i<10; ++i )); do cat header.bin sample.xml ; done; sleep 1; } | nc -l 7000 & | |
node load.js |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This sample achieves about 700 records/sec with a 206-byte payload of which 8 bytes are inserted, on my 3 year old laptop.