Skip to content

Instantly share code, notes, and snippets.

@Raynos
Created June 25, 2013 02:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Raynos/5855478 to your computer and use it in GitHub Desktop.
Save Raynos/5855478 to your computer and use it in GitHub Desktop.
var Transform = require("stream").Transform
module.exports = mapAsync
function mapAsync(write, end) {
var stream = new Transform({ objectMode: true })
stream._transform = function (value, _, cb) {
write(value, cb)
}
stream._flush = function (cb) {
if (end) end(cb)
else cb()
}
return stream
}
// to create a "through" stream use mapAsync
var stream = mapAsync(function (word, cb) {
this.push(word.toUpperCase()); // put objects in the buffer
cb(); // pull in the next value
});
// to consume stuff from stream use a Writable sink
// this handles buffering / backpressure etc
var sink = new Writable({ objectMode: true });
sink._write = function (chunk, _, cb) {
console.log("got chunk", chunk);
cb();
};
// write like normally do
stream.write("hello");
stream.write("world");
// notice sink does not miss chunk
stream.pipe(sink);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment