Skip to content

Instantly share code, notes, and snippets.

@retrohacker
Last active August 29, 2015 14:01
Show Gist options
  • Save retrohacker/3329c61bceb94eb095b4 to your computer and use it in GitHub Desktop.
Save retrohacker/3329c61bceb94eb095b4 to your computer and use it in GitHub Desktop.
/**
* Refer to comment for description of XML file.
* This script loads in a medium size (23 GB) XML file and parses out timestamps (w/o date) as a map.
* The timestamps are the creation dates of each entity in the XML document.
*/
var cluster = require('cluster')
var queue = []
var timestampMap = {}
var threads = 11;
var needWork = []
/**
* Determine what chunk of the program to run
*/
if(cluster.isMaster) {
master()
} else {
console.log("Worker "+cluster.worker.id+" started")
map()
}
/**
* Master reads in data from posts.xml interpreting an entry as a single line of the file.
* It then spawns off threads each responsible for ~*chunksize* worth of entries.
* Finally it writes the generated key-value pairs to the HDD at output.data
*/
function master() {
for(var i = 0; i < threads; i++) {
needWork.push(cluster.fork())
}
var fs = require('fs')
var stream = fs.createReadStream('posts.xml')
var raw = ""
var progress = 0
var chunksize = 5000
stream.on('data', function(chunk) {
raw += chunk
arr = raw.split("\n")
raw = arr.pop()
queue = queue.concat(arr)
if(queue.length>chunksize) {
progress+=queue.length
startWorker(workerComplete(stream))
if(needWork.length==0) {stream.pause()}
} //end if
}) //end stream.on
stream.on('end',function() {
queue = queue.concat(raw.split("\n"))
getTimestamps()
console.log("Writing data to FS")
fs.writeFile("output.data",JSON.stringify(timestampMap))
})
}
/**
* Spawn a worker thread sending it the entire queue up until this point.
*/
function startWorker(callback) {
worker = needWork.pop()
worker.on('message',callback(worker))
console.log("Sending "+queue.length+" to "+worker.id)
worker.send({msg:queue})
queue=[]
}
/**
* Called when a worker sends its result to Master. It parses through the key-value pairs and reduces them into the timestampMap{}.
*/
function workerComplete(stream) {
return function(worker) {
return function(msg) {
worker.removeAllListeners()
needWork.push(worker);
stream.resume();
console.log("Master received: "+msg)
Object.keys(msg).forEach(function(value,index) {
if(typeof timestampMap[index] === "undefined") {
timestampMap[index] = value
} else {
timestampMap[index] += value
} //end if
}) //end Object.keys.forEach
} //end function()
} //end function(id)
}
/**
* This is the sole function of the child threads. It takes in an array of XML entities from Master, and decodes them handling logic inside the callback.
*/
function map() {
process.on('message',decodeXML)
}
/**
* Decodes an array of XML entities, extracting key-value pairs and sending the resulting array of key-value pairs back to the master thread. When it's work is done, it kills itself.
*/
function decodeXML(msg) {
console.log(cluster.worker.id+" received message: ",msg.msg.length)
queue = msg.msg
result = getTimestamps(queue)
process.send(result)
}
/**
* Generates the array of key-value pairs. In this case, we extract timestamps.
*/
function getTimestamps(queue) {
var xml2js = require('xml2js')
timestamps = {}
queue.forEach(function(val) {
xml2js.parseString(val,function(err,result) {
time = parseRow(result)
if(time == null) {return}
if(typeof timestampMap[time]==="undefined") {
timestamps[time] = 1
} else {
timestamps[time]++
}
}) // end xml2js.parseString()
}) //end queue.forEach
return timestamps
}
/**
* We parse a single XML entity looking for our data. If found, we return it. In this case we are looking for a timestamp.
*/
function parseRow(result) {
if( result!=null &&
typeof result !== "undefined" &&
typeof result["row"] !== "undefined" &&
typeof result["row"]["$"] !== "undefined" &&
typeof result["row"]["$"]["CreationDate"] !== "undefined") {
split = result["row"]["$"]["CreationDate"].split("T")
if(split.length>1) {
time = split[1].split(".")[0]
return time
} //end if
} //end if
return null
}
@retrohacker
Copy link
Author

Problem:

I don't know how to throttle the creation of threads/reading in data from HDD. I quickly run out of memory while forking children.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment