Created
April 28, 2012 18:51
-
-
Save oleics/2521280 to your computer and use it in GitHub Desktop.
Cool nodejs streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Stream = require('stream').Stream | |
, Lazy = require('lazy') | |
var timeout = -1 | |
, rndMax = 10000 | |
// create two streams and emit random JSON data strings (objects) | |
var s1 = createStream() | |
, s2 = createStream() | |
emit_random_data(s1, 's1', timeout, rndMax) | |
emit_random_data(s2, 's2', timeout, rndMax) | |
// convert the JSON-data-streams into streams of parsed JSON-data | |
s3 = toParsedJsonStream(s1) | |
s4 = toParsedJsonStream(s2) | |
// join both streams into a fifth stream | |
var s5 = join(s3, s4) | |
// filter every object with rnd == 5 in the fifth stream and emit | |
// them as data in a sixth stream | |
var s6 = simpleFilter(s5, {rnd: 5}) | |
// print whats going on in the fifth and sixth stream | |
// plus: a silly benchmark | |
var num_s5 = 0 | |
s5.on('data', function(d) { | |
num_s5++ | |
}) | |
var num_s6 = 0 | |
, start = Date.now() | |
, ips = 0 | |
s6.on('data', function(d) { | |
num_s6++ | |
ips = ((num_s5 * 1000) / (Date.now() - start)).toFixed(0) | |
console.log( | |
's6 %s. <%s> %s items %s items per second' | |
, (' '+num_s6).slice(-6) | |
, d.name | |
, (' '+num_s5).slice(-9) | |
, (' '+ips).slice(-9) | |
) | |
}) | |
// FUNCTIONS | |
// creates a read- and writable stream | |
function createStream() { | |
var s = new Stream() | |
s.readable = true | |
s.writable = true | |
s.write = function(d) { | |
this.emit('data', d) | |
} | |
return s | |
} | |
// creates a stream that emits the parsed JSON data of another | |
// stream | |
function toParsedJsonStream(stream) { | |
var s = createStream() | |
Lazy(stream) | |
.lines | |
.map(String) | |
.forEach(function(d) { | |
s.emit('data', JSON.parse(d)) | |
}) | |
return s | |
} | |
// joins multiple streams into one stream | |
function join(/* stream1, stream2, ..., streamX */) { | |
// if we got only one stream, return it | |
if(arguments.length === 1) { | |
return arguments[0] | |
} | |
var s = createStream() | |
Array.prototype.slice.call(arguments, 0).forEach(function(v) { | |
v.pipe(s) | |
// v.on('data', s.emit.bind(s, 'data')) | |
}) | |
return s | |
} | |
// creates a stream that emits only matching objects of a stream | |
// of objects | |
function simpleFilter(stream, filter) { | |
var s = createStream() | |
, keys = Object.keys(filter) | |
Lazy(stream) | |
.filter(function(d) { | |
var i, key | |
for(i=0; i<keys.length; i++) { | |
key = keys[i] | |
if(d[key] !== filter[key]) { | |
return false | |
} | |
} | |
return true | |
}) | |
.forEach(function(d) { | |
s.emit('data', d) | |
}) | |
return s | |
} | |
// helper: emits random json-data | |
function emit_random_data(stream, name, timeout, rndMax) { | |
process.nextTick(function() { | |
stream.emit('data', JSON.stringify({ | |
name: name | |
, rnd: Math.floor(Math.random()*rndMax) | |
})+'\n') | |
}) | |
if(timeout < 0) { | |
process.nextTick(function() { | |
emit_random_data(stream, name, timeout, rndMax) | |
}) | |
} else { | |
setTimeout( | |
emit_random_data | |
, Math.floor(Math.random()*timeout) | |
, stream, name, timeout, rndMax | |
) | |
} | |
// stream = null | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/pkrumins/node-lazy | |
var EventEmitter = require('events').EventEmitter; | |
Lazy.prototype = new EventEmitter; | |
module.exports = Lazy; | |
function Lazy (em, opts) { | |
if (!(this instanceof Lazy)) return new Lazy(em, opts); | |
var self = this; | |
if (em) { | |
if (!em._events) em._events = {}; | |
self._events = em._events; | |
} | |
self.once = function (name, f) { | |
self.on(name, function g () { | |
self.removeListener(name, g); | |
f.apply(this, arguments); | |
}); | |
} | |
if (!opts) opts = {}; | |
var dataName = opts.data || 'data'; | |
var pipeName = opts.pipe || 'pipe'; | |
var endName = opts.pipe || 'end'; | |
if (pipeName != endName) { | |
var piped = false; | |
self.once(pipeName, function () { piped = true }); | |
self.once(endName, function () { | |
if (!piped) self.emit(pipeName); | |
}); | |
} | |
self.push = function (x) { | |
self.emit(dataName, x); | |
} | |
self.end = function () { | |
self.emit(endName); | |
} | |
function newLazy (g, h) { | |
if (!g) g = function () { return true }; | |
if (!h) h = function (x) { return x }; | |
var lazy = new Lazy(null, opts); | |
self.on(dataName, function (x) { | |
if (g.call(lazy, x)) lazy.emit(dataName, h(x)); | |
}); | |
self.once(pipeName, function () { | |
lazy.emit(pipeName) | |
}); | |
return lazy; | |
} | |
self.filter = function (f) { | |
return newLazy(function (x) { | |
return f(x); | |
}); | |
} | |
self.forEach = function (f) { | |
return newLazy(function (x) { | |
f(x); | |
return true; | |
}); | |
} | |
self.map = function (f) { | |
return newLazy( | |
function () { return true }, | |
function (x) { return f(x) } | |
); | |
} | |
self.head = function (f) { | |
var lazy = newLazy(); | |
lazy.on(dataName, function g (x) { | |
f(x) | |
lazy.removeListener(dataName, g) | |
}) | |
} | |
self.tail = function () { | |
var skip = true; | |
return newLazy(function () { | |
if (skip) { | |
skip = false; | |
return false; | |
} | |
return true; | |
}); | |
} | |
self.skip = function (n) { | |
return newLazy(function () { | |
if (n > 0) { | |
n--; | |
return false; | |
} | |
return true; | |
}); | |
} | |
self.take = function (n) { | |
return newLazy(function () { | |
if (n == 0) self.emit(pipeName); | |
return n-- > 0; | |
}); | |
} | |
self.takeWhile = function (f) { | |
var cond = true; | |
return newLazy(function (x) { | |
if (cond && f(x)) return true; | |
cond = false; | |
self.emit(pipeName); | |
return false; | |
}); | |
} | |
self.foldr = function (op, i, f) { | |
var acc = i; | |
var lazy = newLazy(); | |
lazy.on(dataName, function g (x) { | |
acc = op(x, acc); | |
}); | |
lazy.once(pipeName, function () { | |
f(acc); | |
}); | |
} | |
self.sum = function (f) { | |
return self.foldr(function (x, acc) { return x + acc }, 0, f); | |
} | |
self.product = function (f) { | |
return self.foldr(function (x, acc) { return x*acc }, 1, f); | |
} | |
self.join = function (f) { | |
var data = [] | |
var lazy = newLazy(function (x) { | |
data.push(x); | |
return true; | |
}); | |
lazy.once(pipeName, function () { f(data) }); | |
return self; | |
} | |
self.bucket = function (init, f) { | |
var lazy = new Lazy(null, opts); | |
var yield = function (x) { | |
lazy.emit(dataName, x); | |
}; | |
var acc = init; | |
self.on(dataName, function (x) { | |
acc = f.call(yield, acc, x); | |
}); | |
self.once(pipeName, function () { | |
lazy.emit(pipeName) | |
}); | |
// flush on end event | |
self.once(endName, function () { | |
var finalBuffer = mergeBuffers(acc); | |
if(finalBuffer) yield(finalBuffer); | |
}); | |
return lazy; | |
} | |
// Streams that use this should emit strings or buffers only | |
self.__defineGetter__('lines', function () { | |
return self.bucket([], function (chunkArray, chunk) { | |
var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0; | |
if (typeof chunk === 'string') chunk = new Buffer(chunk); | |
for (var i = 0; i < chunk.length; i++) { | |
if (chunk[i] === newline) { | |
// If we have content from the current chunk to append to our buffers, do it. | |
if(i>0) chunkArray.push(chunk.slice(lastNewLineIndex, i)); | |
// Wrap all our buffers and emit it. | |
this(mergeBuffers(chunkArray)); | |
lastNewLineIndex = i + 1; | |
} | |
} | |
if(lastNewLineIndex>0) { | |
// New line found in the chunk, push the remaining part of the buffer. | |
if(lastNewLineIndex < chunk.length) chunkArray.push(chunk.slice(lastNewLineIndex)); | |
} else { | |
// No new line found, push the whole buffer. | |
if(chunk.length) chunkArray.push(chunk); | |
} | |
return chunkArray; | |
}); | |
}); | |
} | |
Lazy.range = function () { | |
var args = arguments; | |
var step = 1; | |
var infinite = false; | |
if (args.length == 1 && typeof args[0] == 'number') { | |
var i = 0, j = args[0]; | |
} | |
else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]' | |
var arg = args[0]; | |
var startOpen = false, endClosed = false; | |
if (arg[0] == '(' || arg[0] == '[') { | |
if (arg[0] == '(') startOpen = true; | |
arg = arg.slice(1); | |
} | |
if (arg.slice(-1) == ']') endClosed = true; | |
var parts = arg.split('..'); | |
if (parts.length != 2) | |
throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'"); | |
if (parts[1] == '') { // 'start..' | |
var i = parts[0]; | |
infinite = true; | |
} | |
else { // 'start[,next]..end' | |
var progression = parts[0].split(','); | |
if (progression.length == 1) { // start..end | |
var i = parts[0], j = parts[1]; | |
} | |
else { // 'start,next..end' | |
var i = progression[0], j = parts[1]; | |
step = Math.abs(progression[1]-i); | |
} | |
} | |
i = parseInt(i, 10); | |
j = parseInt(j, 10); | |
if (startOpen) { | |
if (infinite || i < j) i++; | |
else i--; | |
} | |
if (endClosed) { | |
if (i < j) j++; | |
else j--; | |
} | |
} | |
else if (args.length == 2 || args.length == 3) { // start, end[, step] | |
var i = args[0], j = args[1]; | |
if (args.length == 3) { | |
var step = args[2]; | |
} | |
} | |
else { | |
throw new Error("range takes 1, 2 or 3 arguments"); | |
} | |
var lazy = new Lazy; | |
var stopInfinite = false; | |
lazy.on('pipe', function () { | |
stopInfinite = true; | |
}); | |
if (infinite) { | |
process.nextTick(function g () { | |
if (stopInfinite) return; | |
lazy.emit('data', i++); | |
process.nextTick(g); | |
}); | |
} | |
else { | |
process.nextTick(function () { | |
if (i < j) { | |
for (; i<j; i+=step) { | |
lazy.emit('data', i) | |
} | |
} | |
else { | |
for (; i>j; i-=step) { | |
lazy.emit('data', i) | |
} | |
} | |
lazy.emit('end'); | |
}); | |
} | |
return lazy; | |
} | |
var mergeBuffers = function mergeBuffers(buffers) { | |
// We expect buffers to be a non-empty Array | |
if (!buffers || !Array.isArray(buffers) || !buffers.length) return; | |
var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0; | |
// Sum all the buffers lengths | |
finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0); | |
finalBuffer = new Buffer(finalBufferLength); | |
while(buffers.length) { | |
currentBuffer = buffers.shift(); | |
currentBuffer.copy(finalBuffer, currentSize); | |
currentSize += currentBuffer.length; | |
} | |
return finalBuffer; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment