Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parsing BIG compressed XML data (OpenStreetMap) with Node.js
#!/usr/bin/env node
/*
* Parsing BIG compressed data with Node.js and SAX
* Probably some bugs, but worked fine for OpenStreetMap Great Britain bz2
* Greg Miell 2012
*/
// Simple string trim prototype extension
String.prototype.trim = function() {
return this.replace(/^\s+|\s+$/g, "");
};
var sax = require('sax')
, strict = true
, gzbz2 = require('gzbz2/bunzipstream')
// Had to manually build bzbz2 this from another repo for Node 0.6.x
// https://github.com/tungj/node-gzbz2
, fs = require('fs');
// Create decompression stream
var dataStream = gzbz2.wrap('./great_britain.osm.bz2', {lowercasetags: true})
// Create SAX parser stream
var saxStream = sax.createStream(strict, {lowercasetags: true});
// Create output stream
var outStream = fs.createWriteStream('./addresses.dump');
// Some stats calculations, not required
var chars_proc = 0
, seconds = 0
, addresses = 0;
// Count bytes received from decompression Stream as they come in
dataStream.on('data', function (data) {
chars_proc += data.length;
});
// Internal write string buffer, could have been done better, but it works!
var paused = false
, p_buffer = '';
// Write function with buffering for output stream
function write_file(data) {
p_buffer += data + '\n';
if (paused) return;
if (p_buffer.length > 1024) {
if (!outStream.write(p_buffer))
paused = true;
p_buffer = '';
}
}
// Start writing to output buffer when stream has drained
outStream.on('drain', function () {
paused = false;
});
// Display stats every 10 seconds
var interval = setInterval(function () {
seconds += 10;
console.log('Processed', Math.round(chars_proc/1024/1024), 'mb in', seconds, 'seconds with', addresses, 'addresses', p_buffer.length, 'bytes buffered');
}, 10000);
// State variables and regexs for parsing OSM data
var addr = false
, addr_opened = false
, addr_re = /^addr\:/
, crlf_re = /(\r\n|\r|\n)/;
// Parse tags as they are opened, "node" tags begin new address data, "tag" tags are the data we want to dump.
saxStream.on('opentag', function (node) {
switch (node.name) {
case 'node':
addr = true;
break;
case 'tag':
var k = node.attributes.k
, v = node.attributes.v;
if (addr && k && k.match && k.match(addr_re)) {
if (!addr_opened) {
// console.log('addr:');
write_file('addr:');
addresses += 1;
addr_opened = true;
}
if (v)
v.replace(crlf_re, '\\n').trim();
// console.log('\t' + k + '|'+ v);
write_file('\t' + k + '|'+ v);
}
break;
}
});
// Reset address tag opened variable when "node" tag is closed
saxStream.on('closetag', function (name) {
if (name === 'node') addr = addr_opened = false;
});
// On SAX stream end cleanup
saxStream.on('end', function () {
clearInterval(interval);
outStream.end();
});
// Start pipeing data from decompression stream into SAX
dataStream.pipe(saxStream);
// That's all folks!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.