-
-
Save Raynos/5145d6f5091335654614 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var array = function (list) { | |
var index = 0 | |
return readstream(function (writable, recurse) { | |
if (index === list.length) { | |
// null is end of thing | |
writable(null) | |
} | |
writable(list[index++], recurse) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function buffer(readable) { | |
var buffer = [] | |
var highWaterMark = 100 | |
var paused = false | |
function drainBuffer(writable) { | |
var item = buffer. | |
} | |
return function stream(writable) { | |
readable(function (chunk, recurse) { | |
// the source has halted. We should drain this buffer into | |
// writable | |
if (chunk === null || chunk instanceof Error) { | |
return drainBuffer(writable) | |
} | |
// always push onto buffer | |
buffer.push(chunk) | |
// If paused then DO NOT WRITE TO WRITABLE | |
if (paused) { | |
if (buffer.length < highWaterMark) { | |
// let's just pre-emptively pull some stuff out of source | |
recurse() | |
} | |
return | |
} | |
// pause right before writing to writable | |
paused = true | |
// write item to writable! | |
var item = buffer.shift() | |
writable(item, function () { | |
// unpause once writable says I WANT MORE | |
paused = false | |
// tell source we want more | |
recurse() | |
}) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fileR = reader("./package.json") | |
var fileW = writer("./package.json") | |
var file = duplex(fileR, fileW) | |
file(function (chunk, recurse) { | |
// I AM READING FROM FILE | |
}) | |
file("LOL DATA", function (er) { | |
// I HAVE WRITTEN LOL DATA TO FILE | |
}) | |
function duplex(readable, writable) { | |
return function (chunk, recurse) { | |
// called as a readable | |
if (typeof chunk === "function") { | |
readable(chunk) | |
} else { | |
writable(chunk, recurse) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SO SIMPLE | |
function map(readable, mapper) { | |
return function (writable) { | |
readable(function (chunk, recurse) { | |
writable(mapper(chunk), recurse) | |
}) | |
} | |
} | |
// WINNING | |
function asyncMap(readable, mapper) { | |
return function (writable) { | |
readable(function (chunk, recurse) { | |
mapper(chunk, function (result) { | |
writable(result, recurse) | |
}) | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// http://jsfiddle.net/y5zUc/2/ | |
var stream = readstream(function (writable, recurse) { | |
// we write some chunk to writable which will call our callback | |
// when it has flushed it. | |
writable("some chunk", recurse) | |
}) | |
stream(writable) | |
// a writable is a function that takes chunks and writes it somewhere | |
function writable(chunk, recurse) { | |
// do somethign with chunk | |
console.log("written chunk", chunk) | |
// pull more data out of source in 1s | |
setTimeout(recurse, 1000) | |
} | |
function readstream(readable) { | |
return function (writable) { | |
// we pump from readable to writer immediately | |
recurse() | |
// we pass readable the writer & loop so that it can write | |
// some more | |
function recurse() { | |
readable(writable, recurse) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function cursorStream(opts) { | |
var source | |
return readstream(function (writable, recurse) { | |
if (!source) { | |
source = db.cursor(opts) | |
} | |
cursor.nextObject(function (er, item) { | |
if (er) return writable(er) | |
if (item === null) return writable(null) | |
writable(item, recurse) | |
}) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function socketStream(uri) { | |
return function readable(writable) { | |
var socket = connect(uri) | |
socket.readStart() | |
socket.ondata = function (chunk) { | |
writable(chunk, function () { | |
socket.readStart() | |
}) | |
socket.readStop() | |
} | |
socket.onend = function () { | |
writable(null) | |
} | |
socket.onerror = function (er) { | |
writable(er) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fs = require("fs") | |
var file = reader("./package.json") | |
file(function (chunk, recurse) { | |
/* got chunk of file */ | |
if (chunk === null) { | |
/* EOF */ | |
} else if (chunk instanceof Error) { | |
/* OOPS */ | |
} else { | |
/* file is a stream of buffers. So this is a buffer */ | |
// ensure you either always recurse immediately or hold the recurse | |
// callback so that you can call it later when your ready to | |
// pull more from source | |
recurse() | |
} | |
}) | |
function reader(filePath) { | |
var fd, index = 0 | |
var withClose = function (recurse) { | |
return function (value) { | |
if (value === "STOP") { | |
/* close file descriptor */ | |
} else { | |
recurse() | |
} | |
} | |
} | |
return readstream(function (writable, recurse) { | |
recurse = withClose(recurse) | |
if (!fd) { | |
fs.open(filePath, "r+", function (er, _fd) { | |
if (er) return writable(er) | |
fd = _fd | |
read(fd, writable, recurse) | |
}) | |
} else { | |
read(fd, writable, recurse) | |
} | |
}) | |
function read(fd, writable, recurse) { | |
var buffer = new Buffer(8192) | |
fs.read(fd, buffer, index, 8192, -1, function (er, bytesRead) { | |
if (er) return writable(er) | |
if (bytesRead === 0) { | |
return writable(null) | |
} | |
index += bytesRead | |
writable(buffer, recurse) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment