Last active
May 8, 2017 01:36
-
-
Save AutoSponge/b7f8494aa34f065d167738bf2bd8f870 to your computer and use it in GitHub Desktop.
fast-json-stringify stream collection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<title>Hello World Streaming JSON Client</title> | |
<script src="https://rawgit.com/jimhigson/oboe.js/master/dist/oboe-browser.min.js"></script> | |
</head> | |
<body> | |
<button type="button"></button> | |
<script> | |
const counter = { value: 0, done: false } | |
const renderNext = (data, n) => () => { | |
document.querySelector('button').innerText = `press to stop ${n}` | |
document.body.insertAdjacentHTML('beforeEnd', `<p>${data.hello}</p>`) | |
} | |
oboe(`/api/http${window.location.search}`) | |
.node({ | |
'![*]': function (data) { | |
window.requestAnimationFrame(renderNext(data, counter.value += 1)) | |
if (counter.done === true) { | |
return this.abort() | |
} | |
// clear it from memory since we don't have a #done | |
return oboe.drop; | |
} | |
}) | |
document.querySelector('button').addEventListener('click', () => counter.done = true) | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const app = require('express')() | |
const fastJson = require('fast-json-stringify') | |
const faker = require('faker') | |
const a = require('awaiting') | |
const ecstatic = require('ecstatic') | |
const map = require('./map') | |
const take = require('./take') | |
const pipe = require('./pipe') | |
// custom generator for names | |
// also works with array thunks | |
// const names = () => [{ hello: 'Darth Vader' }, { hello: 'Luke Skywalker' }] | |
const names = function * () { | |
while (true) { | |
const hello = faker.name.findName() | |
yield { hello } | |
} | |
} | |
// our json schema | |
const schema = { | |
out: { | |
type: 'object', | |
properties: { | |
hello: { type: 'string' } | |
} | |
} | |
} | |
const createPump = (req, res) => { | |
return async g => { | |
let ready | |
let connected = true | |
while (connected) { | |
req.connection.removeAllListeners('close') | |
let { value, done } = g.next() | |
if (done === true) { | |
return true | |
} | |
ready = res.write(value) | |
// check for backpressure or closed connection | |
if (ready === false) { | |
await Promise.race([ | |
a.event(res, 'drain'), | |
a.event(req.connection, 'close').then(() => (connected = false)) | |
]) | |
} | |
} | |
} | |
} | |
// reuseable stringifier | |
const stringify = fastJson(schema.out) | |
const formatJSONStream = (val, i) => `${i === 0 ? '[\n' : '\n,\n'}${val}` | |
const JSONStream = pipe(take(names), map(stringify), map(formatJSONStream)) | |
const theWholeWorld = 7.5 * 1e9 | |
app.get('/api/http', (req, res, next) => { | |
const { limit = theWholeWorld } = req.query | |
res.set('Content-Type', 'application/stream+json') | |
const pump = createPump(req, res) | |
const limitFn = (x, i) => i === +limit | |
pump(JSONStream(limitFn)) | |
.then(finished => { | |
res.end('\n]\n') | |
console.log({ finished }) | |
}) | |
.catch(next) | |
}) | |
app.use(ecstatic({ root: `${__dirname}/public` })) | |
app.use(require('./src/handle-error')) | |
app.listen(4000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Alternate method that uses Transducers instead of the custom map/take fns | |
// Only issue is that collections always end with trailing comma because | |
// the map() does not provide index | |
// Did not seem to work with an Array the way the original did | |
const app = require('express')() | |
const fastJson = require('fast-json-stringify') | |
const faker = require('faker') | |
const a = require('awaiting') | |
const ecstatic = require('ecstatic') | |
const pipe = require('./src/pipe') | |
const { map, take } = require('transducers') | |
const names = { | |
* hello () { | |
while (true) { | |
const hello = faker.name.findName() | |
yield { hello } | |
} | |
} | |
} | |
// our json schema | |
const schema = { | |
out: { | |
type: 'object', | |
properties: { | |
hello: { type: 'string' } | |
} | |
} | |
} | |
const createPump = (req, res) => { | |
return async g => { | |
let ready | |
let connected = true | |
while (connected) { | |
req.connection.removeAllListeners('close') | |
let { value, done } = g.next() | |
if (done === true) { | |
return true | |
} | |
ready = res.write(value) | |
// check for backpressure or closed connection | |
if (ready === false) { | |
await Promise.race([ | |
a.event(res, 'drain'), | |
a.event(req.connection, 'close').then(() => (connected = false)) | |
]) | |
} | |
} | |
} | |
} | |
// reuseable stringifier | |
const stringify = fastJson(schema.out) | |
const formatJSONStream = val => `\n${val}\n,` | |
const transform = pipe(map(stringify), map(formatJSONStream)) | |
const theWholeWorld = 7.5 * 1e9 | |
app.get('/api/http', (req, res, next) => { | |
const { limit = theWholeWorld } = req.query | |
res.set('Content-Type', 'application/stream+json') | |
const pump = createPump(req, res) | |
res.write(`[`) | |
pump(transform(take(limit)(names.hello()))) | |
.then(finished => { | |
res.end('\n]\n') | |
console.log({ finished }) | |
}) | |
.catch(next) | |
}) | |
app.use(ecstatic({ root: `${__dirname}/public` })) | |
app.use(require('./src/handle-error')) | |
app.listen(4000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const app = require('express')() | |
const fastJson = require('fast-json-stringify') | |
const faker = require('faker') | |
const a = require('awaiting') | |
const ecstatic = require('ecstatic') | |
const pipe = require('./src/pipe') | |
const map = require('./src/map') | |
const take = require('./src/take') | |
const names = { | |
* hello () { | |
while (true) { | |
const hello = faker.name.findName() | |
yield { hello } | |
} | |
} | |
} | |
const json = { | |
* stream (g) { | |
yield `[\n${g.next().value}` | |
for (const value of g) { | |
yield `\n,\n${value}` | |
} | |
yield '\n]\n' | |
} | |
} | |
// our json schema | |
const schema = { | |
out: { | |
type: 'object', | |
properties: { | |
hello: { type: 'string' } | |
} | |
} | |
} | |
const createPump = (req, res) => { | |
return async g => { | |
let ready | |
let connected = true | |
while (connected) { | |
req.connection.removeAllListeners('close') | |
let { value, done } = g.next() | |
if (done === true) { | |
return true | |
} | |
ready = res.write(value) | |
if (ready === false) { | |
await Promise.race([ | |
a.event(res, 'drain'), | |
a.event(req.connection, 'close').then(() => (connected = false)) | |
]) | |
} | |
} | |
} | |
} | |
const stringify = fastJson(schema.out) | |
const process = pipe(take(names.hello), map(stringify), json.stream) | |
const theWholeWorld = 7.5 * 1e9 | |
app.get('/api/http', (req, res, next) => { | |
const { limit = theWholeWorld } = req.query | |
res.set('Content-Type', 'application/stream+json') | |
const pump = createPump(req, res) | |
pump(process(limit)) | |
.then(finished => { | |
res.end() | |
console.log({ finished }) | |
}) | |
.catch(next) | |
}) | |
app.use(ecstatic({ root: `${__dirname}/public` })) | |
app.use(require('./src/handle-error')) | |
app.listen(4000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = fn => | |
function * (iterable) { | |
let i = 0 | |
for (const x of iterable) { | |
yield fn(x, i++) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"scripts": { | |
"start": "node ./index.js" | |
}, | |
"dependencies": { | |
"awaiting": "^2.2.0", | |
"callsite-record": "^4.0.0", | |
"ecstatic": "^2.1.0", | |
"express": "^4.15.2", | |
"faker": "^4.1.0", | |
"fast-json-stringify": "^0.11.0" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = (head, ...tail) => (...args) => | |
tail.reduce((value, fn) => fn(value), head(...args)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const never = () => false | |
module.exports = iterableFn => | |
function * (fn = never) { | |
let iterable = iterableFn() | |
let i = 0 | |
for (const x of iterable) { | |
if (fn(x, i)) { | |
break | |
} | |
yield x | |
i++ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks to @mcollina for a little guidance here.
Still a WIP. But I think I'm getting closer (and learning a lot).
sendAll
would hang at awaiting for the drain event. With thePromise.race
looking for either a drain or a closed connection, I can prove it finishes properly which should release the memory.Promise.race
caused once handlers to get added but were not removed. So, multiple drain events would hitmaxlisteners
error forclose
event listeners.Now theI only need to remove the.then()
for adrain
unregisters allclose
and vice versa. Not sure if this is better in the.then()
or if it should be in theif (ready === false)
block to remove both. Sinceawaiting
hides the callback used, I can't remove the exact listener, hence the use ofremoveAllListeners
. I searchedexpress
's code and couldn't find anything usingclose
ordrain
events. However, this could have a negative effect on other middleware that may need these events. I may refactor to removeawaiting
so I can safely remove individual listeners.close
listener when a drain happens, so that's done at the top of thewhile
loop now and I ignore cleaning up thedrain
listener when we close (since we're closed and going to return--but if this doesn't release memory I'll add it back).I decided to re-initializesendAll
in the handler so we can applyoptions
to the creation of the iterable. This was useful for sending?limit=10
during testing.sendAll
intopump
. This did a few of things:pump
more generic so it could be used for different serializersremoveAllListeners
is calledpump
resolves totrue
if the iterator finished which is helpful in determining what to send if the same client connects again..then()
or.catch()
.pipe
helps compose the generators and iterators into a lazy process that only initializes the iterator (insidetake
) when the resulting generator is invoked.map
allows me to write single-responsibility functions that only understand values and index and operate on the lazily-evaluated values of the generator.json.stream
could be a generator that takes the result ofpipe
ing the other functions, which means it can come last in thepipe
. By going back to the hand-rolled version oftake
, I can put the entire process together outside of the request listener. This also simplifiesmap
so it no longer needs the index value. I also don't need to send the final]
when ending the stream.