experiencing with streams in order to read the contents of all files from a directory
node index.js
lists all the files from a ./files
directory. It expects all the files
to be compressed with gzip.
var zlib = require('zlib'); | |
var util = require('util'); | |
var fs = require('fs'); | |
var stream = require('stream'); | |
var async = require('async'); | |
function FileStream (transform, encoding) { | |
stream.Stream.apply(this); | |
this.readable = true; | |
this.writable = true; | |
var self = this; | |
this._queue = async.queue(function(filename, callback) { | |
if(filename == '') return callback(); | |
var stream = fs.createReadStream(filename); | |
if(transform) { | |
var t = transform(filename); | |
stream.pipe(t); | |
stream = t; | |
} | |
stream.on('error', callback); | |
stream.on('data', function(data) { | |
self.emit('data', data); | |
}); | |
stream.on('end', function() { | |
callback(); | |
}); | |
}, 1) | |
} | |
util.inherits(FileStream, stream.Stream); | |
FileStream.prototype.write = function write(chunk) { | |
var self = this; | |
this._queue.push(chunk, function(err) { | |
if(err) return self.emit('error', err); | |
}); | |
}; | |
FileStream.prototype.end = function end() { | |
var self = this; | |
this._queue.push('', function() { | |
self.emit('end'); | |
}) | |
}; | |
module.exports = FileStream; |
var ListStream = require('./ListStream'); | |
var FileStream = require('./FileStream'); | |
var zlib = require('zlib'); | |
var listStream = new ListStream('./files'); // emits filenames | |
var fileStream = new FileStream(zlib.createGunzip); // transforms filename stream into concatenated contents of said filenames (take a transform stream as parameter for each content) | |
listStream.pipe(fileStream).pipe(process.stderr); | |
var util = require('util'); | |
var fs = require('fs'); | |
var path = require('path'); | |
var stream = require('stream'); | |
function ListStream (directory) { | |
directory = path.resolve(process.cwd(), directory); | |
stream.Stream.apply(this); | |
this.readable = true; | |
this.writable = false; | |
var self = this; | |
fs.readdir(directory, function(err, files) { | |
files.sort(); | |
if(err) return self.emit('error', err); | |
files.forEach(function(e) { | |
self.emit('data', path.resolve(directory, e)); | |
}) | |
self.emit('end'); | |
}) | |
} | |
util.inherits(ListStream, stream.Stream); | |
module.exports = ListStream; |