Skip to content

Instantly share code, notes, and snippets.

@dai-shi
Last active July 20, 2019 13:42
Show Gist options
  • Save dai-shi/368af21162fe701dd213f62edf1fdacf to your computer and use it in GitHub Desktop.
Save dai-shi/368af21162fe701dd213f62edf1fdacf to your computer and use it in GitHub Desktop.
Infinite counter stream with express
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html>
<head>
<script crossorigin src="https://unpkg.com/react@16.8.6/umd/react.production.min.js"></script>
<script crossorigin src="https://unpkg.com/react-dom@16.8.6/umd/react-dom.production.min.js"></script>
</head>
<body>
<h1>Counter</h1>
<div id="app"></div>
<script>
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const useStream = url => {
const [str, setStr] = React.useState(null);
React.useEffect(() => {
const writer = new WritableStream({
write: (chunk) => {
setStr(String.fromCharCode.apply('', chunk));
},
});
const abortController = new AbortController();
const pipeStream = async () => {
const res = await fetch('/counter', { signal: abortController.signal });
res.body.pipeThrough(transformer).pipeTo(writer);
};
pipeStream();
const cleanup = () => {
abortController.abort();
};
return cleanup;
}, [url]);
return str;
};
const App = () => {
const str = useStream('/counter');
return React.createElement('div', {}, str);
};
ReactDOM.render(React.createElement(App), document.getElementById('app'));
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html><body>
<h1>Counter</h1>
<div id="app"></div>
<script>
const writer = new WritableStream({
write: (chunk) => {
const str = String.fromCharCode.apply('', chunk);
document.getElementById('app').innerHTML = str;
},
});
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const main = async () => {
const res = await fetch('/counter');
res.body.pipeThrough(transformer).pipeTo(writer);
};
main();
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html>
<head>
<script src="https://cdn.jsdelivr.net/npm/vue"></script>
</head>
<body>
<h1>Counter</h1>
<div id="app">{{ str }}</div>
<script>
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const app = new Vue({
el: '#app',
data: {
str: ''
},
mounted () {
const writer = new WritableStream({
write: (chunk) => {
this.str = String.fromCharCode.apply('', chunk);
},
});
this.abortController = new AbortController();
const pipeStream = async () => {
const res = await fetch('/counter', { signal: this.abortController.signal });
res.body.pipeThrough(transformer).pipeTo(writer);
};
pipeStream();
},
beforeDestroy() {
this.abortController.abort();
}
});
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
@OctaneInteractive
Copy link

OctaneInteractive commented Jul 20, 2019

Hi!

I had tried: responseType: 'stream', this morning, but got an error:

The provided value 'stream' is not a valid enum value of type XMLHttpRequestResponseType.

... and an investigation revealed an open and issue in the Axios repo with no fix.

But, after an inspection of the code, I got it working.

Daishi, thank you!

@dai-shi
Copy link
Author

dai-shi commented Jul 20, 2019

Oh, really. The axios README is so confusing...
You need to use fetch then. Yeah, I remember the stream api is one of the benefits of using fetch over XHR.
Good luck!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment