Skip to content

Instantly share code, notes, and snippets.

@gabrielf
Created March 22, 2013 13:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gabrielf/5221167 to your computer and use it in GitHub Desktop.
Save gabrielf/5221167 to your computer and use it in GitHub Desktop.
Reformatted changes of https://github.com/pkrumins/node-lazy/pull/33/files to match the project's coding style.
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var stream = require('stream');
function Lazy(em, opts) {
if (!(this instanceof Lazy)) return new Lazy(em, opts);
EventEmitter.call(this);
var self = this;
self.once = function (name, f) {
self.on(name, function g () {
self.removeListener(name, g);
f.apply(this, arguments);
});
}
if (!opts) opts = {};
var dataName = opts.data || 'data';
var pipeName = opts.pipe || 'pipe';
var endName = opts.pipe || 'end';
if (pipeName != endName) {
var piped = false;
self.once(pipeName, function () { piped = true });
self.once(endName, function () {
if (!piped) self.emit(pipeName);
});
}
self.push = function (x) {
self.emit(dataName, x);
}
self.end = function () {
self.emit(endName);
}
if (em && em.on) {
em.on(endName, function () {
self.emit(endName);
});
self.on(pipeName, function () {
em.emit(pipeName);
});
// Check for v0.10 or Greater (Stream2 has Duplex type)
if (stream.Duplex && em instanceof(stream)) {
em.on('readable', function () {
var x = em.read();
self.emit(dataName, x);
});
} else {
// Old Stream1 or Event support
em.on(dataName, function (x) {
self.emit(dataName, x);
});
}
}
function newLazy (g, h, l) {
if (!g) {
g = function () {
return true;
};
}
if (!h) {
h = function (x) {
return x;
};
}
var lazy = new Lazy(null, opts, l);
self.on(dataName, function (x, y) {
if (g.call(lazy, x)) {
lazy.emit(dataName, h(x), y);
}
});
self.once(pipeName, function () {
lazy.emit(pipeName);
});
return lazy;
}
self.filter = function (f) {
return newLazy(function (x) {
return f(x);
});
}
self.forEach = function (f) {
return newLazy(function (x) {
f(x);
return true;
});
}
self.map = function (f) {
return newLazy(
function () { return true },
function (x) { return f(x) }
);
}
self.head = function (f) {
var lazy = newLazy();
lazy.on(dataName, function g (x) {
f(x)
lazy.removeListener(dataName, g)
})
}
self.tail = function () {
var skip = true;
return newLazy(function () {
if (skip) {
skip = false;
return false;
}
return true;
});
}
self.skip = function (n) {
return newLazy(function () {
if (n > 0) {
n--;
return false;
}
return true;
});
}
self.take = function (n) {
return newLazy(function () {
if (n == 0) self.emit(pipeName);
return n-- > 0;
});
}
self.takeWhile = function (f) {
var cond = true;
return newLazy(function (x) {
if (cond && f(x)) return true;
cond = false;
self.emit(pipeName);
return false;
});
}
self.foldr = function (op, i, f) {
var acc = i;
var lazy = newLazy();
lazy.on(dataName, function g (x) {
acc = op(x, acc);
});
lazy.once(pipeName, function () {
f(acc);
});
}
self.sum = function (f) {
return self.foldr(function (x, acc) { return x + acc }, 0, f);
}
self.product = function (f) {
return self.foldr(function (x, acc) { return x*acc }, 1, f);
}
self.join = function (f) {
var data = []
var lazy = newLazy(function (x) {
data.push(x);
return true;
});
lazy.once(pipeName, function () { f(data) });
return self;
}
self.bucket = function (init, f) {
var lazy = new Lazy(null, opts);
var yieldTo = function (x) {
lazy.emit(dataName, x);
};
var acc = init;
self.on(dataName, function (x) {
acc = f.call(yieldTo, acc, x);
});
self.once(pipeName, function () {
lazy.emit(pipeName);
});
// flush on end event
self.once(endName, function () {
var finalBuffer = mergeBuffers(acc);
if (finalBuffer) {
yieldTo(finalBuffer);
}
});
return lazy;
}
// Streams that use this should emit strings or buffers only
self.__defineGetter__('lines', function () {
return self.bucket([], function (chunkArray, chunk) {
var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0;
if (typeof chunk === 'string') chunk = new Buffer(chunk);
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === newline) {
// If we have content from the current chunk to append to our buffers, do it.
if (i > 0) {
chunkArray.push(chunk.slice(lastNewLineIndex, i));
}
// Wrap all our buffers and emit it.
this(mergeBuffers(chunkArray));
lastNewLineIndex = i + 1;
}
}
if (lastNewLineIndex > 0) {
// New line found in the chunk, push the remaining part of the buffer.
if (lastNewLineIndex < chunk.length) {
chunkArray.push(chunk.slice(lastNewLineIndex));
}
} else {
// No new line found, push the whole buffer.
if (chunk.length) {
chunkArray.push(chunk);
}
}
return chunkArray;
});
});
}
Lazy.range = function () {
var args = arguments;
var step = 1;
var infinite = false;
if (args.length == 1 && typeof args[0] == 'number') {
var i = 0, j = args[0];
}
else if (args.length == 1 && typeof args[0] == 'string') { // 'start[,next]..[end]'
var arg = args[0];
var startOpen = false, endClosed = false;
if (arg[0] == '(' || arg[0] == '[') {
if (arg[0] == '(') startOpen = true;
arg = arg.slice(1);
}
if (arg.slice(-1) == ']') endClosed = true;
var parts = arg.split('..');
if (parts.length != 2)
throw new Error("single argument range takes 'start..' or 'start..end' or 'start,next..end'");
if (parts[1] == '') { // 'start..'
var i = parts[0];
infinite = true;
}
else { // 'start[,next]..end'
var progression = parts[0].split(',');
if (progression.length == 1) { // start..end
var i = parts[0], j = parts[1];
}
else { // 'start,next..end'
var i = progression[0], j = parts[1];
step = Math.abs(progression[1]-i);
}
}
i = parseInt(i, 10);
j = parseInt(j, 10);
if (startOpen) {
if (infinite || i < j) i++;
else i--;
}
if (endClosed) {
if (i < j) j++;
else j--;
}
}
else if (args.length == 2 || args.length == 3) { // start, end[, step]
var i = args[0], j = args[1];
if (args.length == 3) {
var step = args[2];
}
}
else {
throw new Error("range takes 1, 2 or 3 arguments");
}
var lazy = new Lazy;
var stopInfinite = false;
lazy.on('pipe', function () {
stopInfinite = true;
});
if (infinite) {
process.nextTick(function g () {
if (stopInfinite) return;
lazy.emit('data', i++);
process.nextTick(g);
});
}
else {
process.nextTick(function () {
if (i < j) {
for (; i<j; i+=step) {
lazy.emit('data', i)
}
}
else {
for (; i>j; i-=step) {
lazy.emit('data', i)
}
}
lazy.emit('end');
});
}
return lazy;
}
var mergeBuffers = function mergeBuffers(buffers) {
// We expect buffers to be a non-empty Array
if (!buffers || !Array.isArray(buffers) || !buffers.length) return;
var finalBufferLength, finalBuffer, currentBuffer, currentSize = 0;
// Sum all the buffers lengths
finalBufferLength = buffers.reduce(function(left, right) { return (left.length||left) + (right.length||right); }, 0);
finalBuffer = new Buffer(finalBufferLength);
while(buffers.length) {
currentBuffer = buffers.shift();
currentBuffer.copy(finalBuffer, currentSize);
currentSize += currentBuffer.length;
}
return finalBuffer;
}
util.inherits(Lazy, EventEmitter);
module.exports = Lazy;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment