Skip to content

Instantly share code, notes, and snippets.

@kirbysayshi
Created April 18, 2014 15:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirbysayshi/11049085 to your computer and use it in GitHub Desktop.
Save kirbysayshi/11049085 to your computer and use it in GitHub Desktop.
var through = require('through');
var path = require('path');
var ss = require('stream-stream');
var avro = require('node-avro-io');
var hdfs = require('webhdfs').createClient({
host: '' // stuff goes here
});
avrostream2('/log/ap/2014-04-01/01/ClientEvent', hdfs)
.pipe(new avro.DataFile.Reader())
.on('data', console.log.bind(console))
function avrostream2(partpath, hdfsClient) {
var out = ss();
hdfs.readdir(partpath, function(err, files) {
if (err) {
return emitErr(err);
}
if (!files.length) {
return emitErr(new Error('No files found at ' + partpath + '\n' + JSON.stringify(files)));
}
var fstreams = files.map(function(fstat, i) {
var fname = fstat.pathSuffix;
var fstream = hdfsClient.createReadStream(path.join(partpath, fname));
fstream.pause();
out.write(fstream);
return fstream;
});
fstreams[0].resume();
})
function emitErr(err) {
out.emit('error', err)
out.end();
return;
}
return out;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment