-
-
Save Raynos/e771cbd9d6b188972c74 to your computer and use it in GitHub Desktop.
Recurse Stream 2.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// http://jsfiddle.net/zG99A/10/ | |
// | |
// Readable is a function(Function:Writable writable, Boolean isClosed) | |
// Writable is a function(Object chunk, Function:Readable recurse) | |
// Duplex is a function(Function:Readable readable) return Function:Readable | |
function createReadArray(list) { | |
var i = 0; | |
return function readable(writable, isClosed) { | |
return isClosed ? null : | |
i === list.length ? writable(null) : | |
writable(list[i++], readable) | |
} | |
} | |
function logger(chunk, recurse) { | |
console.log("chunk", chunk) && recurse && recurse(logger) | |
} | |
var double = map(function (i) { return i * 2 }) | |
var square = map(function (i) { return i * i }) | |
square(double(createReadArray([1, 2, 3])))(logger) | |
pipeable(createReadArray([1, 2, 3])) | |
.pipe(double) | |
.pipe(square) | |
(logger) | |
pipeable(createReadArray([1, 2, 3, 4])) | |
.pipe(buffer(5)) | |
.pipe(map(function double(x) { | |
return x * 2 | |
})) | |
.pipe(filter(function (x) { | |
return x !== 4 | |
})) | |
.pipe(takeWhile(function (x) { | |
return x < 7 | |
})) | |
(logger) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function buffer(highWaterMark) { | |
highWaterMark = highWaterMark === undefined ? 100 : highWaterMark | |
return function duplex(source) { | |
var list = [] | |
var paused = false | |
var ended = false | |
var closed = false | |
function drainBuffer(writable, isClosed) { | |
if (list.length === 0 || closed) { | |
return | |
} | |
var item = list.shift() | |
writable(item, drainBuffer) | |
} | |
return function readable(destination, isClosed) { | |
source(function writable(chunk) { | |
if (closed) { | |
return | |
} else if (!ended) { | |
list.push(chunk) | |
} | |
// the source has halted. We should drain this buffer into | |
// writable | |
if (chunk === null || chunk instanceof Error) { | |
ended = true | |
return drainBuffer(destination) | |
} else if (paused) { | |
return buffer.length < highWaterMark ? | |
readable(destination) : null | |
} | |
// pause right before writing to writable | |
paused = true | |
// write item to writable! | |
var item = list.shift() | |
writable(item, function (destination, isClosed) { | |
if (isClosed) { | |
closed = true | |
} | |
// unpause once writable says I WANT MORE | |
paused = false | |
// if ended then drain else tell source we want more | |
ended ? drainBuffer(destination, isClosed) : | |
readable(destination, isClosed) | |
}) | |
}, isClosed) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// readable :: function (writable, isClosed) | |
// writable :: function (end, data) | |
// duplex :: function (readable) return readable | |
depthFirst(process.cwd(), function (dir) { | |
return readList(function (callback) { | |
fs.readdir(dir, function (err, files) { | |
if (err) { | |
return err.code === "ENOTDIR" ? callback(null, []) : callback(err) | |
} | |
callback(null, files) | |
}) | |
}) | |
}) | |
function readList(getList) { | |
var list | |
return function readable(writable, isClosed) { | |
if (isClosed) return | |
if (list) return send() | |
getList(function (err, result) { | |
if (err) { | |
return writable(err) | |
} | |
list = result | |
send() | |
}) | |
function send() { | |
var item = list.shift() | |
if (!item) return writable(true) | |
return writable(null, item) | |
} | |
} | |
} | |
function depthFirst(initial, createReadable) { | |
var streams = [createReadable(initial)] | |
return function readable(writable, isClosed) { | |
if (isClosed) return streams.forEach(function (s) { | |
s(null, true) | |
}) | |
if (streams.length === 0) return writable(true) | |
streams[0](function (err, chunk) { | |
if (err === true) { | |
streams.shift() | |
return readable(writable) | |
} else if (err) { | |
return writable(err) | |
} | |
streams.push(createReadable(chunk)) | |
writable(null, chunk) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function Duplex(process) { | |
return function duplex(source) { | |
return function readable(destination, isClosed) { | |
source(function writable(chunk) { | |
chunk === null ? destination(chunk) : | |
chunk instanceof Error ? destination(chunk) : | |
process(chunk, destination, readable) | |
}, isClosed) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// pipe takes a readable source and adds a pipe method | |
// which can be called with a duplex and it will return | |
// a new readable from the duplex which also has pipe method! | |
function pipeable(readable) { | |
readable.pipe = function (duplex) { | |
return pipeable(duplex(readable)) | |
} | |
return readable | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function map(lambda) { | |
return Duplex(function process(chunk, dest, recurse) { | |
dest(lambda(chunk), recurse) | |
}) | |
} | |
function filter(lambda) { | |
return Duplex(function process(chunk, dest, recurse) { | |
lambda(chunk) ? dest(chunk, recurse) : recurse(dest) | |
}) | |
} | |
function takeWhile(lambda) { | |
return Duplex(function process(chunk, dest, recurse) { | |
lambda(chunk) ? dest(chunk, recurse) : recurse(null, true) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment