Skip to content

Instantly share code, notes, and snippets.

@oleics
Created April 28, 2012 18:51
Show Gist options
  • Save oleics/2521280 to your computer and use it in GitHub Desktop.
Save oleics/2521280 to your computer and use it in GitHub Desktop.
Cool nodejs streams
var Stream = require('stream').Stream
, Lazy = require('lazy')
var timeout = -1
, rndMax = 10000
// create two streams and emit random JSON data strings (objects)
var s1 = createStream()
, s2 = createStream()
emit_random_data(s1, 's1', timeout, rndMax)
emit_random_data(s2, 's2', timeout, rndMax)
// convert the JSON-data-streams into streams of parsed JSON-data
s3 = toParsedJsonStream(s1)
s4 = toParsedJsonStream(s2)
// join both streams into a fifth stream
var s5 = join(s3, s4)
// filter every object with rnd == 5 in the fifth stream and emit
// them as data in a sixth stream
var s6 = simpleFilter(s5, {rnd: 5})
// print whats going on in the fifth and sixth stream
// plus: a silly benchmark
var num_s5 = 0
s5.on('data', function(d) {
num_s5++
})
var num_s6 = 0
, start = Date.now()
, ips = 0
s6.on('data', function(d) {
num_s6++
ips = ((num_s5 * 1000) / (Date.now() - start)).toFixed(0)
console.log(
's6 %s. <%s> %s items %s items per second'
, (' '+num_s6).slice(-6)
, d.name
, (' '+num_s5).slice(-9)
, (' '+ips).slice(-9)
)
})
// FUNCTIONS
// creates a read- and writable stream
function createStream() {
var s = new Stream()
s.readable = true
s.writable = true
s.write = function(d) {
this.emit('data', d)
}
return s
}
// creates a stream that emits the parsed JSON data of another
// stream
function toParsedJsonStream(stream) {
var s = createStream()
Lazy(stream)
.lines
.map(String)
.forEach(function(d) {
s.emit('data', JSON.parse(d))
})
return s
}
// joins multiple streams into one stream
function join(/* stream1, stream2, ..., streamX */) {
// if we got only one stream, return it
if(arguments.length === 1) {
return arguments[0]
}
var s = createStream()
Array.prototype.slice.call(arguments, 0).forEach(function(v) {
v.pipe(s)
// v.on('data', s.emit.bind(s, 'data'))
})
return s
}
// creates a stream that emits only matching objects of a stream
// of objects
function simpleFilter(stream, filter) {
var s = createStream()
, keys = Object.keys(filter)
Lazy(stream)
.filter(function(d) {
var i, key
for(i=0; i<keys.length; i++) {
key = keys[i]
if(d[key] !== filter[key]) {
return false
}
}
return true
})
.forEach(function(d) {
s.emit('data', d)
})
return s
}
// helper: emits random json-data
function emit_random_data(stream, name, timeout, rndMax) {
process.nextTick(function() {
stream.emit('data', JSON.stringify({
name: name
, rnd: Math.floor(Math.random()*rndMax)
})+'\n')
})
if(timeout < 0) {
process.nextTick(function() {
emit_random_data(stream, name, timeout, rndMax)
})
} else {
setTimeout(
emit_random_data
, Math.floor(Math.random()*timeout)
, stream, name, timeout, rndMax
)
}
// stream = null
}
// https://github.com/pkrumins/node-lazy
var EventEmitter = require('events').EventEmitter;
Lazy.prototype = new EventEmitter;
module.exports = Lazy;
function Lazy (em, opts) {
if (!(this instanceof Lazy)) return new Lazy(em, opts);
var self = this;
if (em) {
if (!em._events) em._events = {};
self._events = em._events;
}
self.once = function (name, f) {
self.on(name, function g () {
self.removeListener(name, g);
f.apply(this, arguments);
});
}
if (!opts) opts = {};
var dataName = opts.data || 'data';
var pipeName = opts.pipe || 'pipe';
var endName = opts.pipe || 'end';
if (pipeName != endName) {
var piped = false;
self.once(pipeName, function () { piped = true });
self.once(endName, function () {
if (!piped) self.emit(pipeName);
});
}
self.push = function (x) {
self.emit(dataName, x);
}
self.end = function () {
self.emit(endName);
}
function newLazy (g, h) {
if (!g) g = function () { return true };
if (!h) h = function (x) { return x };
var lazy = new Lazy(null, opts);
self.on(dataName, function (x) {
if (g.call(lazy, x)) lazy.emit(dataName, h(x));
});
self.once(pipeName, function () {
lazy.emit(pipeName)
});
return lazy;
}
self.filter = function (f) {
return newLazy(function (x) {
return f(x);
});
}
self.forEach = function (f) {
return newLazy(function (x) {
f(x);
return true;
});
}
self.map = function (f) {
return newLazy(
function () { return true },
function (x) { return f(x) }
);
}
self.head = function (f) {
var lazy = newLazy();
lazy.on(dataName, function g (x) {
f(x)
lazy.removeListener(dataName, g)
})
}
self.tail = function () {
var skip = true;
return newLazy(function () {
if (skip) {
skip = false;
return false;
}
return true;
});
}
self.skip = function (n) {
return newLazy(function () {
if (n > 0) {
n--;
return false;
}
return true;
});
}
self.take = function (n) {
return newLazy(function () {
if (n == 0) self.emit(pipeName);
return n-- > 0;
});
}
self.takeWhile = function (f) {
var cond = true;
return newLazy(function (x) {
if (cond && f(x)) return true;
cond = false;
self.emit(pipeName);
return false;
});
}
self.foldr = function (op, i, f) {
var acc = i;
var lazy = newLazy();
lazy.on(dataName, function g (x) {
acc = op(x, acc);
});
lazy.once(pipeName, function () {
f(acc);
});
}
self.sum = function (f) {
return self.foldr(function (x, acc) { return x + acc }, 0, f);
}
self.product = function (f) {
return self.foldr(function (x, acc) { return x*acc }, 1, f);
}
self.join = function (f) {
var data = []
var lazy = newLazy(function (x) {
data.push(x);
return true;
});
lazy.once(pipeName, function () { f(data) });
return self;
}
self.bucket = function (init, f) {
var lazy = new Lazy(null, opts);
var yield = function (x) {
lazy.emit(dataName, x);
};
var acc = init;
self.on(dataName, function (x) {
acc = f.call(yield, acc, x);
});
self.once(pipeName, function () {
lazy.emit(pipeName)
});
// flush on end event
self.once(endName, function () {
var finalBuffer = mergeBuffers(acc);
if(finalBuffer) yield(finalBuffer);
});
return lazy;
}
// Streams that use this should emit strings or buffers only
self.__defineGetter__('lines', function () {
return self.bucket([], function (chunkArray, chunk) {
var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0;
if (typeof chunk === 'string') chunk = new Buffer(chunk);
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === newline) {
// If we have content from the current chunk to append to our buffers, do it.
if(i>0) chunkArray.push(chunk.slice(lastNewLineIndex, i));
// Wrap all our buffers and emit it.
this(mergeBuffers(chunkArray));
lastNewLineIndex = i + 1;
}
}
if(lastNewLineIndex>0) {
// New line found in the chunk, push the remaining part of the buffer.
if(lastNewLineIndex < chunk.length) chunkArray.push(chunk.slice(lastNewLineIndex));
} else {
// No new line found, push the whole buffer.
if(chunk.length) chunkArray.push(chunk);
}
return chunkArray;
});
});
}
Lazy.range = function () {
var args = arguments;
var step = 1;
var infinite = false;
if (args.length == 1 && typeof args[0] == 'number') {
var i = 0, j = args[0];
}
else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]'
var arg = args[0];
var startOpen = false, endClosed = false;
if (arg[0] == '(' || arg[0] == '[') {
if (arg[0] == '(') startOpen = true;
arg = arg.slice(1);
}
if (arg.slice(-1) == ']') endClosed = true;
var parts = arg.split('..');
if (parts.length != 2)
throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'");
if (parts[1] == '') { // 'start..'
var i = parts[0];
infinite = true;
}
else { // 'start[,next]..end'
var progression = parts[0].split(',');
if (progression.length == 1) { // start..end
var i = parts[0], j = parts[1];
}
else { // 'start,next..end'
var i = progression[0], j = parts[1];
step = Math.abs(progression[1]-i);
}
}
i = parseInt(i, 10);
j = parseInt(j, 10);
if (startOpen) {
if (infinite || i < j) i++;
else i--;
}
if (endClosed) {
if (i < j) j++;
else j--;
}
}
else if (args.length == 2 || args.length == 3) { // start, end[, step]
var i = args[0], j = args[1];
if (args.length == 3) {
var step = args[2];
}
}
else {
throw new Error("range takes 1, 2 or 3 arguments");
}
var lazy = new Lazy;
var stopInfinite = false;
lazy.on('pipe', function () {
stopInfinite = true;
});
if (infinite) {
process.nextTick(function g () {
if (stopInfinite) return;
lazy.emit('data', i++);
process.nextTick(g);
});
}
else {
process.nextTick(function () {
if (i < j) {
for (; i<j; i+=step) {
lazy.emit('data', i)
}
}
else {
for (; i>j; i-=step) {
lazy.emit('data', i)
}
}
lazy.emit('end');
});
}
return lazy;
}
var mergeBuffers = function mergeBuffers(buffers) {
// We expect buffers to be a non-empty Array
if (!buffers || !Array.isArray(buffers) || !buffers.length) return;
var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0;
// Sum all the buffers lengths
finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0);
finalBuffer = new Buffer(finalBufferLength);
while(buffers.length) {
currentBuffer = buffers.shift();
currentBuffer.copy(finalBuffer, currentSize);
currentSize += currentBuffer.length;
}
return finalBuffer;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment