Skip to content

Instantly share code, notes, and snippets.

@jfarid27
Last active October 17, 2015 17:30
Show Gist options
  • Save jfarid27/30ed7854229f3bbe5393 to your computer and use it in GitHub Desktop.
Save jfarid27/30ed7854229f3bbe5393 to your computer and use it in GitHub Desktop.
var zlib = require("zlib");
var jsonstream = require("json-stream");
var fs = require("fs");
var combine = require("combine-streams");
var Readable = require("stream").Readable;
var Async = require("async");
var work = function(cachedir, file){
return function(callback) {
if (file.endsWith(".json.gz") && file.indexOf(dateprefix)>-1) {
console.log("Scanning:",file);
var scanner = scangz(cachedir + file);
scanner.on('end', function () {
console.log("Done with", file);
callback(null)
});
var scannerp = scanner.pipe(jsonstream());
scannerp.on('error', function(err) {
console.log(err);
console.log(err.stack);
callback("Scanner p error")
});
scannerp.on('data', function(chunk) {
if (filter(chunk)) {
results.push(chunk);
}
});
}
}
}
var scangz = function (fname) {
var rawread = fs.createReadStream(fname);
var unzipped = rawread.pipe(zlib.createUnzip());
return unzipped;
}
// Return a stream
module.exports.scanner = function(cachedir, dateprefix, filter) {
var results = new Readable({objectMode: true});
results._read = function noop() {};
fs.readdir(cachedir, function(err,files) {
if (err) throw err;
/* Now for each file, raise the event and add it to the event queue
* Since node is single threaded, the events will be added to the queue,
* completes reading, then only one event will be handled at a time.
*/
var workload = []
files.forEach(function(file){
worlkload.push(work(cachedir, file))
});
async.series(workload, function(err){
if (err) {
//Write your own error handler here
}
})
});
return results;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment