Skip to content

Instantly share code, notes, and snippets.

@Raynos
Last active December 15, 2015 07:49
Show Gist options
  • Save Raynos/e771cbd9d6b188972c74 to your computer and use it in GitHub Desktop.
Save Raynos/e771cbd9d6b188972c74 to your computer and use it in GitHub Desktop.
Recurse Stream 2.0
// http://jsfiddle.net/zG99A/10/
//
// Readable is a function(Function:Writable writable, Boolean isClosed)
// Writable is a function(Object chunk, Function:Readable recurse)
// Duplex is a function(Function:Readable readable) return Function:Readable
function createReadArray(list) {
var i = 0;
return function readable(writable, isClosed) {
return isClosed ? null :
i === list.length ? writable(null) :
writable(list[i++], readable)
}
}
function logger(chunk, recurse) {
console.log("chunk", chunk) && recurse && recurse(logger)
}
var double = map(function (i) { return i * 2 })
var square = map(function (i) { return i * i })
square(double(createReadArray([1, 2, 3])))(logger)
pipeable(createReadArray([1, 2, 3]))
.pipe(double)
.pipe(square)
(logger)
pipeable(createReadArray([1, 2, 3, 4]))
.pipe(buffer(5))
.pipe(map(function double(x) {
return x * 2
}))
.pipe(filter(function (x) {
return x !== 4
}))
.pipe(takeWhile(function (x) {
return x < 7
}))
(logger)
function buffer(highWaterMark) {
highWaterMark = highWaterMark === undefined ? 100 : highWaterMark
return function duplex(source) {
var list = []
var paused = false
var ended = false
var closed = false
function drainBuffer(writable, isClosed) {
if (list.length === 0 || closed) {
return
}
var item = list.shift()
writable(item, drainBuffer)
}
return function readable(destination, isClosed) {
source(function writable(chunk) {
if (closed) {
return
} else if (!ended) {
list.push(chunk)
}
// the source has halted. We should drain this buffer into
// writable
if (chunk === null || chunk instanceof Error) {
ended = true
return drainBuffer(destination)
} else if (paused) {
return buffer.length < highWaterMark ?
readable(destination) : null
}
// pause right before writing to writable
paused = true
// write item to writable!
var item = list.shift()
writable(item, function (destination, isClosed) {
if (isClosed) {
closed = true
}
// unpause once writable says I WANT MORE
paused = false
// if ended then drain else tell source we want more
ended ? drainBuffer(destination, isClosed) :
readable(destination, isClosed)
})
}, isClosed)
}
}
}
// readable :: function (writable, isClosed)
// writable :: function (end, data)
// duplex :: function (readable) return readable
depthFirst(process.cwd(), function (dir) {
return readList(function (callback) {
fs.readdir(dir, function (err, files) {
if (err) {
return err.code === "ENOTDIR" ? callback(null, []) : callback(err)
}
callback(null, files)
})
})
})
function readList(getList) {
var list
return function readable(writable, isClosed) {
if (isClosed) return
if (list) return send()
getList(function (err, result) {
if (err) {
return writable(err)
}
list = result
send()
})
function send() {
var item = list.shift()
if (!item) return writable(true)
return writable(null, item)
}
}
}
function depthFirst(initial, createReadable) {
var streams = [createReadable(initial)]
return function readable(writable, isClosed) {
if (isClosed) return streams.forEach(function (s) {
s(null, true)
})
if (streams.length === 0) return writable(true)
streams[0](function (err, chunk) {
if (err === true) {
streams.shift()
return readable(writable)
} else if (err) {
return writable(err)
}
streams.push(createReadable(chunk))
writable(null, chunk)
})
}
}
function Duplex(process) {
return function duplex(source) {
return function readable(destination, isClosed) {
source(function writable(chunk) {
chunk === null ? destination(chunk) :
chunk instanceof Error ? destination(chunk) :
process(chunk, destination, readable)
}, isClosed)
}
}
}
// pipe takes a readable source and adds a pipe method
// which can be called with a duplex and it will return
// a new readable from the duplex which also has pipe method!
function pipeable(readable) {
readable.pipe = function (duplex) {
return pipeable(duplex(readable))
}
return readable
}
function map(lambda) {
return Duplex(function process(chunk, dest, recurse) {
dest(lambda(chunk), recurse)
})
}
function filter(lambda) {
return Duplex(function process(chunk, dest, recurse) {
lambda(chunk) ? dest(chunk, recurse) : recurse(dest)
})
}
function takeWhile(lambda) {
return Duplex(function process(chunk, dest, recurse) {
lambda(chunk) ? dest(chunk, recurse) : recurse(null, true)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment