Skip to content

Instantly share code, notes, and snippets.

@Raynos
Created September 7, 2012 03:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Raynos/3662920 to your computer and use it in GitHub Desktop.
Save Raynos/3662920 to your computer and use it in GitHub Desktop.
Crazy Streams

Lazy streams

A lazy stream takes a readable stream and wraps it.

Any reads or pipes on it propagate to the original stream.

This allows you to model your program flow as small transformations on streaming data and only apply them when you actually pipe the wrapped input stream into an output stream

var Readable = require("readable-stream")
, ENTER = 13
, through = require("through")
, uuid = require("node-uuid")
var presses = DOMEventStream(document.documentElement, "keypress")
, enters = FilterStream(presses, function (event) {
return event.keyCode === ENTER
})
, incomingTitles = MapStream(enters, function (event) {
return event.target.value
})
, validTitles = FilterStream(incomingTitles, function (title) {
return !!title
})
, newTodos = MapStream(validTitles, function (title) {
return {
cid: uuid()
, title: title
, done: false
}
})
, newTodoEls = MapStream(newTodos, function (model) {
var li = document.createElement("li")
, view = document.createElement("div")
, checkbox = document.createElement("input")
, label = document.createElement("label")
, destroyButton = document.createElement("button")
, inputField = document.createElement("input")
li.id = model.cid
view.classList.add("view")
checkbox.type = "checkbox"
checkbox.classList.add("toggle")
label.textContent = model.title
destroyButton.classList.add("destroy")
inputField.classList.add("edit")
inputField.value = model.title
li.appendChild(view)
view.appendChild(checkbox)
view.appendChild(label)
view.appendChild(destroyButton)
li.appendChild(inputField)
return li
})
, todosEl = document.getElementById("todo-list")
, count = ReduceStream(newTodos, function (acc, value) {
return acc + 1
}, 1)
, todoCount = document.getElementById('todo-count')
, toggleEvents = DOMEventStream(document.documentElement, "change")
, toggleUpdates = MapStream(toggleEvents, function (event) {
return {
id: event.target.parentElement.parentElement.id
, done: event.target.checked
}
})
newTodoEls.pipe(InsertBeforeStream(todosEl))
function InsertBeforeStream() {
return through(function (li) {
todosEl.insertBefore(li, todosEl.firstChild)
})
}
count.pipe(through(function (count) {
todoCount.textContent = count + " items left"
}))
enters.pipe(through(function (event) {
event.target.value = ""
}))
toggleUpdates.pipe(through(function (update) {
var classList = document.getElementById(update.id).classList
if (update.done) {
classList.add("completed")
} else {
classList.remove("completed")
}
}))
function DOMEventStream(elem, eventName) {
var stream = new Readable()
, targets = []
, flowing = false
, eventBuffer = []
stream.read = function () {
var ev = eventBuffer.shift()
if (ev === undefined) {
return null
}
return ev
}
stream.pipe = function (target, opts) {
if (!(opts && opts.end === false)) {
reemit(stream, target, ["end"])
}
targets.push(target)
target.emit("pipe", stream)
if (flowing === false) {
flow()
}
return target
function flow() {
var chunk = stream.read()
, terminate = false
, count = 0
while (chunk) {
targets.forEach(writeToTarget)
if (terminate) {
return
}
chunk = stream.read()
}
stream.once("readable", flow)
function writeToTarget(target) {
var written = target.write(chunk)
if (!written) {
count++
terminate = true
target.once("drain", next)
}
}
function next() {
if (--count === 0) {
flow()
}
}
}
}
stream.end = function () {
elem.removeEventListener(eventName)
stream.emit("end")
}
elem.addEventListener(eventName, addToBuffer)
return stream
function addToBuffer(event) {
eventBuffer.push(event)
if (eventBuffer.length === 1) {
stream.emit("readable")
}
}
}
function FilterStream(stream, filterPredicate) {
var filterStream = new Readable()
reemit(stream, filterStream, ["readable", "end"])
filterStream.read = function (n) {
var chunk = stream.read(n)
if (chunk === null) {
return null
}
var keep = filterPredicate(chunk)
if (keep) {
return chunk
} else {
return filterStream.read(n)
}
}
filterStream.pipe = function (target) {
return stream.pipe(through(function (chunk) {
var keep = filterPredicate(chunk)
if (keep) {
this.emit("data", chunk)
}
})).pipe(target)
}
filterStream.end = stream.end
return filterStream
}
function MapStream(stream, mapper) {
var mapStream = new Readable()
reemit(stream, mapStream, ["readable", "end"])
mapStream.read = function () {
var chunk = stream.read()
if (chunk === null) {
return null
}
return mapper(chunk)
}
mapStream.pipe = function (target) {
return stream.pipe(through(function (chunk) {
this.emit("data", mapper(chunk))
})).pipe(target)
}
mapStream.end = stream.end
return mapStream
}
function ReduceStream(stream, reducer, initial) {
var reduceStream = new Readable()
reemit(stream, reduceStream, ["readable", "end"])
reduceStream.read = function () {
var chunk = stream.read()
if (chunk === null) {
return null
}
return (initial = reducer(initial, chunk))
}
reduceStream.pipe = function (target) {
return stream.pipe(through(function (chunk) {
this.emit("data", (initial = reducer(initial, chunk)))
})).pipe(target)
}
reduceStream.end = stream.end
return reduceStream
}
function reemit(source, target, events) {
events.forEach(function (eventName) {
source.on(eventName, propagate)
function propagate() {
var args = [].slice.call(arguments)
args.unshift(eventName)
target.emit.apply(target, args)
}
})
}
@domenic
Copy link

domenic commented Sep 7, 2012

Hmm, why did you have to implement pipe for DOMEventStream? That seemed painful.

@Raynos
Copy link
Author

Raynos commented Sep 15, 2012

@domenic I implemented pipe because readable-stream had a bug. It's a temporary fix for that bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment