Skip to content

Instantly share code, notes, and snippets.

@Raynos Raynos/array-stream.js Secret
Last active Dec 15, 2015

Embed
What would you like to do?
var array = function (list) {
var index = 0
return readstream(function (writable, recurse) {
if (index === list.length) {
// null is end of thing
writable(null)
}
writable(list[index++], recurse)
})
}
function buffer(readable) {
var buffer = []
var highWaterMark = 100
var paused = false
function drainBuffer(writable) {
var item = buffer.
}
return function stream(writable) {
readable(function (chunk, recurse) {
// the source has halted. We should drain this buffer into
// writable
if (chunk === null || chunk instanceof Error) {
return drainBuffer(writable)
}
// always push onto buffer
buffer.push(chunk)
// If paused then DO NOT WRITE TO WRITABLE
if (paused) {
if (buffer.length < highWaterMark) {
// let's just pre-emptively pull some stuff out of source
recurse()
}
return
}
// pause right before writing to writable
paused = true
// write item to writable!
var item = buffer.shift()
writable(item, function () {
// unpause once writable says I WANT MORE
paused = false
// tell source we want more
recurse()
})
})
}
}
var fileR = reader("./package.json")
var fileW = writer("./package.json")
var file = duplex(fileR, fileW)
file(function (chunk, recurse) {
// I AM READING FROM FILE
})
file("LOL DATA", function (er) {
// I HAVE WRITTEN LOL DATA TO FILE
})
function duplex(readable, writable) {
return function (chunk, recurse) {
// called as a readable
if (typeof chunk === "function") {
readable(chunk)
} else {
writable(chunk, recurse)
}
}
}
// SO SIMPLE
function map(readable, mapper) {
return function (writable) {
readable(function (chunk, recurse) {
writable(mapper(chunk), recurse)
})
}
}
// WINNING
function asyncMap(readable, mapper) {
return function (writable) {
readable(function (chunk, recurse) {
mapper(chunk, function (result) {
writable(result, recurse)
})
})
}
}
// http://jsfiddle.net/y5zUc/2/
var stream = readstream(function (writable, recurse) {
// we write some chunk to writable which will call our callback
// when it has flushed it.
writable("some chunk", recurse)
})
stream(writable)
// a writable is a function that takes chunks and writes it somewhere
function writable(chunk, recurse) {
// do somethign with chunk
console.log("written chunk", chunk)
// pull more data out of source in 1s
setTimeout(recurse, 1000)
}
function readstream(readable) {
return function (writable) {
// we pump from readable to writer immediately
recurse()
// we pass readable the writer & loop so that it can write
// some more
function recurse() {
readable(writable, recurse)
}
}
}
function cursorStream(opts) {
var source
return readstream(function (writable, recurse) {
if (!source) {
source = db.cursor(opts)
}
cursor.nextObject(function (er, item) {
if (er) return writable(er)
if (item === null) return writable(null)
writable(item, recurse)
})
})
}
function socketStream(uri) {
return function readable(writable) {
var socket = connect(uri)
socket.readStart()
socket.ondata = function (chunk) {
writable(chunk, function () {
socket.readStart()
})
socket.readStop()
}
socket.onend = function () {
writable(null)
}
socket.onerror = function (er) {
writable(er)
}
}
}
var fs = require("fs")
var file = reader("./package.json")
file(function (chunk, recurse) {
/* got chunk of file */
if (chunk === null) {
/* EOF */
} else if (chunk instanceof Error) {
/* OOPS */
} else {
/* file is a stream of buffers. So this is a buffer */
// ensure you either always recurse immediately or hold the recurse
// callback so that you can call it later when your ready to
// pull more from source
recurse()
}
})
function reader(filePath) {
var fd, index = 0
var withClose = function (recurse) {
return function (value) {
if (value === "STOP") {
/* close file descriptor */
} else {
recurse()
}
}
}
return readstream(function (writable, recurse) {
recurse = withClose(recurse)
if (!fd) {
fs.open(filePath, "r+", function (er, _fd) {
if (er) return writable(er)
fd = _fd
read(fd, writable, recurse)
})
} else {
read(fd, writable, recurse)
}
})
function read(fd, writable, recurse) {
var buffer = new Buffer(8192)
fs.read(fd, buffer, index, 8192, -1, function (er, bytesRead) {
if (er) return writable(er)
if (bytesRead === 0) {
return writable(null)
}
index += bytesRead
writable(buffer, recurse)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.