Skip to content

Instantly share code, notes, and snippets.

@AutoSponge
Last active May 8, 2017 01:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AutoSponge/b7f8494aa34f065d167738bf2bd8f870 to your computer and use it in GitHub Desktop.
Save AutoSponge/b7f8494aa34f065d167738bf2bd8f870 to your computer and use it in GitHub Desktop.
fast-json-stringify stream collection
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Hello World Streaming JSON Client</title>
<script src="https://rawgit.com/jimhigson/oboe.js/master/dist/oboe-browser.min.js"></script>
</head>
<body>
<button type="button"></button>
<script>
const counter = { value: 0, done: false }
const renderNext = (data, n) => () => {
document.querySelector('button').innerText = `press to stop ${n}`
document.body.insertAdjacentHTML('beforeEnd', `<p>${data.hello}</p>`)
}
oboe(`/api/http${window.location.search}`)
.node({
'![*]': function (data) {
window.requestAnimationFrame(renderNext(data, counter.value += 1))
if (counter.done === true) {
return this.abort()
}
// clear it from memory since we don't have a #done
return oboe.drop;
}
})
document.querySelector('button').addEventListener('click', () => counter.done = true)
</script>
</body>
</html>
const app = require('express')()
const fastJson = require('fast-json-stringify')
const faker = require('faker')
const a = require('awaiting')
const ecstatic = require('ecstatic')
const map = require('./map')
const take = require('./take')
const pipe = require('./pipe')
// custom generator for names
// also works with array thunks
// const names = () => [{ hello: 'Darth Vader' }, { hello: 'Luke Skywalker' }]
const names = function * () {
while (true) {
const hello = faker.name.findName()
yield { hello }
}
}
// our json schema
const schema = {
out: {
type: 'object',
properties: {
hello: { type: 'string' }
}
}
}
const createPump = (req, res) => {
return async g => {
let ready
let connected = true
while (connected) {
req.connection.removeAllListeners('close')
let { value, done } = g.next()
if (done === true) {
return true
}
ready = res.write(value)
// check for backpressure or closed connection
if (ready === false) {
await Promise.race([
a.event(res, 'drain'),
a.event(req.connection, 'close').then(() => (connected = false))
])
}
}
}
}
// reuseable stringifier
const stringify = fastJson(schema.out)
const formatJSONStream = (val, i) => `${i === 0 ? '[\n' : '\n,\n'}${val}`
const JSONStream = pipe(take(names), map(stringify), map(formatJSONStream))
const theWholeWorld = 7.5 * 1e9
app.get('/api/http', (req, res, next) => {
const { limit = theWholeWorld } = req.query
res.set('Content-Type', 'application/stream+json')
const pump = createPump(req, res)
const limitFn = (x, i) => i === +limit
pump(JSONStream(limitFn))
.then(finished => {
res.end('\n]\n')
console.log({ finished })
})
.catch(next)
})
app.use(ecstatic({ root: `${__dirname}/public` }))
app.use(require('./src/handle-error'))
app.listen(4000)
// Alternate method that uses Transducers instead of the custom map/take fns
// Only issue is that collections always end with trailing comma because
// the map() does not provide index
// Did not seem to work with an Array the way the original did
const app = require('express')()
const fastJson = require('fast-json-stringify')
const faker = require('faker')
const a = require('awaiting')
const ecstatic = require('ecstatic')
const pipe = require('./src/pipe')
const { map, take } = require('transducers')
const names = {
* hello () {
while (true) {
const hello = faker.name.findName()
yield { hello }
}
}
}
// our json schema
const schema = {
out: {
type: 'object',
properties: {
hello: { type: 'string' }
}
}
}
const createPump = (req, res) => {
return async g => {
let ready
let connected = true
while (connected) {
req.connection.removeAllListeners('close')
let { value, done } = g.next()
if (done === true) {
return true
}
ready = res.write(value)
// check for backpressure or closed connection
if (ready === false) {
await Promise.race([
a.event(res, 'drain'),
a.event(req.connection, 'close').then(() => (connected = false))
])
}
}
}
}
// reuseable stringifier
const stringify = fastJson(schema.out)
const formatJSONStream = val => `\n${val}\n,`
const transform = pipe(map(stringify), map(formatJSONStream))
const theWholeWorld = 7.5 * 1e9
app.get('/api/http', (req, res, next) => {
const { limit = theWholeWorld } = req.query
res.set('Content-Type', 'application/stream+json')
const pump = createPump(req, res)
res.write(`[`)
pump(transform(take(limit)(names.hello())))
.then(finished => {
res.end('\n]\n')
console.log({ finished })
})
.catch(next)
})
app.use(ecstatic({ root: `${__dirname}/public` }))
app.use(require('./src/handle-error'))
app.listen(4000)
const app = require('express')()
const fastJson = require('fast-json-stringify')
const faker = require('faker')
const a = require('awaiting')
const ecstatic = require('ecstatic')
const pipe = require('./src/pipe')
const map = require('./src/map')
const take = require('./src/take')
const names = {
* hello () {
while (true) {
const hello = faker.name.findName()
yield { hello }
}
}
}
const json = {
* stream (g) {
yield `[\n${g.next().value}`
for (const value of g) {
yield `\n,\n${value}`
}
yield '\n]\n'
}
}
// our json schema
const schema = {
out: {
type: 'object',
properties: {
hello: { type: 'string' }
}
}
}
const createPump = (req, res) => {
return async g => {
let ready
let connected = true
while (connected) {
req.connection.removeAllListeners('close')
let { value, done } = g.next()
if (done === true) {
return true
}
ready = res.write(value)
if (ready === false) {
await Promise.race([
a.event(res, 'drain'),
a.event(req.connection, 'close').then(() => (connected = false))
])
}
}
}
}
const stringify = fastJson(schema.out)
const process = pipe(take(names.hello), map(stringify), json.stream)
const theWholeWorld = 7.5 * 1e9
app.get('/api/http', (req, res, next) => {
const { limit = theWholeWorld } = req.query
res.set('Content-Type', 'application/stream+json')
const pump = createPump(req, res)
pump(process(limit))
.then(finished => {
res.end()
console.log({ finished })
})
.catch(next)
})
app.use(ecstatic({ root: `${__dirname}/public` }))
app.use(require('./src/handle-error'))
app.listen(4000)
module.exports = fn =>
function * (iterable) {
let i = 0
for (const x of iterable) {
yield fn(x, i++)
}
}
{
"scripts": {
"start": "node ./index.js"
},
"dependencies": {
"awaiting": "^2.2.0",
"callsite-record": "^4.0.0",
"ecstatic": "^2.1.0",
"express": "^4.15.2",
"faker": "^4.1.0",
"fast-json-stringify": "^0.11.0"
}
}
module.exports = (head, ...tail) => (...args) =>
tail.reduce((value, fn) => fn(value), head(...args))
const never = () => false
module.exports = iterableFn =>
function * (fn = never) {
let iterable = iterableFn()
let i = 0
for (const x of iterable) {
if (fn(x, i)) {
break
}
yield x
i++
}
}
@AutoSponge
Copy link
Author

AutoSponge commented May 3, 2017

Thanks to @mcollina for a little guidance here.

Still a WIP. But I think I'm getting closer (and learning a lot).

  • I learned more about streams and their "rules".
  • I learned what backpressure is and how to listen for it.
  • I realized that when the connection was closed by the client, my sendAll would hang at awaiting for the drain event. With the Promise.race looking for either a drain or a closed connection, I can prove it finishes properly which should release the memory.
  • I learned that the Promise.race caused once handlers to get added but were not removed. So, multiple drain events would hit maxlisteners error for close event listeners. Now the .then() for a drain unregisters all close and vice versa. Not sure if this is better in the .then() or if it should be in the if (ready === false) block to remove both. Since awaiting hides the callback used, I can't remove the exact listener, hence the use of removeAllListeners. I searched express's code and couldn't find anything using close or drain events. However, this could have a negative effect on other middleware that may need these events. I may refactor to remove awaiting so I can safely remove individual listeners. I only need to remove the close listener when a drain happens, so that's done at the top of the while loop now and I ignore cleaning up the drain listener when we close (since we're closed and going to return--but if this doesn't release memory I'll add it back).
  • I decided to re-initialize sendAll in the handler so we can apply options to the creation of the iterable. This was useful for sending ?limit=10 during testing.
  • I was able to refactor sendAll into pump. This did a few of things:
    1. extract/restrict the stringify behavior to the generator composition
    2. make pump more generic so it could be used for different serializers
    3. reduce the number of times removeAllListeners is called
    4. pump resolves to true if the iterator finished which is helpful in determining what to send if the same client connects again.
    5. Since the serializer could change, it's important to end the stream in the .then() or .catch().
  • pipe helps compose the generators and iterators into a lazy process that only initializes the iterator (inside take) when the resulting generator is invoked.
  • map allows me to write single-responsibility functions that only understand values and index and operate on the lazily-evaluated values of the generator.
  • I realized that the json.stream could be a generator that takes the result of pipeing the other functions, which means it can come last in the pipe. By going back to the hand-rolled version of take, I can put the entire process together outside of the request listener. This also simplifies map so it no longer needs the index value. I also don't need to send the final ] when ending the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment