Skip to content

Instantly share code, notes, and snippets.

@dai-shi
Last active July 20, 2019 13:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dai-shi/368af21162fe701dd213f62edf1fdacf to your computer and use it in GitHub Desktop.
Save dai-shi/368af21162fe701dd213f62edf1fdacf to your computer and use it in GitHub Desktop.
Infinite counter stream with express
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html>
<head>
<script crossorigin src="https://unpkg.com/react@16.8.6/umd/react.production.min.js"></script>
<script crossorigin src="https://unpkg.com/react-dom@16.8.6/umd/react-dom.production.min.js"></script>
</head>
<body>
<h1>Counter</h1>
<div id="app"></div>
<script>
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const useStream = url => {
const [str, setStr] = React.useState(null);
React.useEffect(() => {
const writer = new WritableStream({
write: (chunk) => {
setStr(String.fromCharCode.apply('', chunk));
},
});
const abortController = new AbortController();
const pipeStream = async () => {
const res = await fetch('/counter', { signal: abortController.signal });
res.body.pipeThrough(transformer).pipeTo(writer);
};
pipeStream();
const cleanup = () => {
abortController.abort();
};
return cleanup;
}, [url]);
return str;
};
const App = () => {
const str = useStream('/counter');
return React.createElement('div', {}, str);
};
ReactDOM.render(React.createElement(App), document.getElementById('app'));
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html><body>
<h1>Counter</h1>
<div id="app"></div>
<script>
const writer = new WritableStream({
write: (chunk) => {
const str = String.fromCharCode.apply('', chunk);
document.getElementById('app').innerHTML = str;
},
});
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const main = async () => {
const res = await fetch('/counter');
res.body.pipeThrough(transformer).pipeTo(writer);
};
main();
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
const { Readable } = require('stream');
const express = require('express');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
read() {},
});
setInterval(() => {
counter += 1;
const str = JSON.stringify({ counter });
reader.push(Buffer.from(str + '\n'));
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
const INDEX_HTML = `
<html>
<head>
<script src="https://cdn.jsdelivr.net/npm/vue"></script>
</head>
<body>
<h1>Counter</h1>
<div id="app">{{ str }}</div>
<script>
const transformer = new TransformStream({
transform: (chunk, controller) => {
let start = 0;
let end = 0;
while (end < chunk.length) {
if (chunk[end] === 0x0a) {
controller.enqueue(chunk.slice(start, end));
start = end + 1;
}
end += 1;
}
},
});
const app = new Vue({
el: '#app',
data: {
str: ''
},
mounted () {
const writer = new WritableStream({
write: (chunk) => {
this.str = String.fromCharCode.apply('', chunk);
},
});
this.abortController = new AbortController();
const pipeStream = async () => {
const res = await fetch('/counter', { signal: this.abortController.signal });
res.body.pipeThrough(transformer).pipeTo(writer);
};
pipeStream();
},
beforeDestroy() {
this.abortController.abort();
}
});
</script>
</body></html>
`;
const app = express();
app.get('/', (req, res) => {
res.send(INDEX_HTML);
});
app.get('/counter', (req, res) => {
counterReader.pipe(res);
});
app.listen(8080);
@OctaneInteractive
Copy link

OctaneInteractive commented Jul 20, 2019

Hi Daishi, and again, much appreciated.

I've not done anything with streams before, so this is as new as it is confusing.

First, where (or how) is the code iterating?

I've moved INDEX_HTML into the component in Vue, but the Response.body.ReadableStream has nothing it, so I'm assuming counterReader.pipe(res) would need returning somehow?

Here's the code I have so far — I had to replace fetch() with Axios, as it wasn't triggering the end point.

// Component: "Elastic.vue"

<section class="box" v-for="(asset, assetType) in buildIndices.assets" :key="assetType">
  <b-field>
    <button class="button is-primary is-pulled-right" @click="buildIndicesByAssetType(assetType)">
      <i :class="asset.icon"></i>&nbsp;Build
    </button>
  </b-field>
  <h4 class="title is-4" v-html="asset.name"></h4>
  <b-notification :closable="false">
    <progress class="progress" :value="asset.numberOfIndices" :max="asset.numberOfItems">{{asset.numberOfIndices}} {{asset.name}}</progress>
    <p>{{asset.numberOfItems}} {{asset.name}} in total</p>
  </b-notification>
</section>

// Logic for "buildIndicesByAssetType()"...

const writer = new WritableStream({
  write: (chunk) => {
    const str = String.fromCharCode.apply('', chunk);
    self.buildIndices.assets[typeOfAsset].numberOfIndices = str

  },
});
const transformer = new TransformStream({
  transform: (chunk, controller) => {
    let start = 0;
    let end = 0;
    while (end < chunk.length) {
      if (chunk[end] === 0x0a) {
        controller.enqueue(chunk.slice(start, end));
        start = end + 1;
      }
      end += 1;
    }
  },
});

  this.$axios.get(`/elastic/indices/${typeOfAsset}/build`)
  .then(function(response) {
    console.log("Elastic:buildIndicesByAssetType:success", response)

    response.data.pipeThrough(transformer).pipeTo(writer)

  })
// Route: "elastic.js"

router.get('/elastic/indices/:typeOfAsset/build', async (req, res) => {

  const createCounterReader = (delay) => {
    let counter = 0;
    const reader = new Readable({
      read() {},
    });
    setInterval(() => {
      counter += 1;
      const str = JSON.stringify({ counter });
      reader.push(Buffer.from(str + '\n'));
    }, delay);
    return reader;
  };
  
  const counterReader = createCounterReader(1000);

  counterReader.pipe(res);

})

However, while this is iterating, it's not returning anything to the component in Vue (in the Network tab for DevTools, I see: "CAUTION: request is not finished yet!").

@dai-shi
Copy link
Author

dai-shi commented Jul 20, 2019

Hi, you might want to convert from the working code to your final goal little by little.
To use stream in axios, you need to specify responseType, check out the doc.

First, where (or how) is the code iterating?

Streams is like an event system, so it's not like iterating.

@dai-shi
Copy link
Author

dai-shi commented Jul 20, 2019

@OctaneInteractive Just added express-stream-vue.js for reference.

@OctaneInteractive
Copy link

OctaneInteractive commented Jul 20, 2019

Hi!

I had tried: responseType: 'stream', this morning, but got an error:

The provided value 'stream' is not a valid enum value of type XMLHttpRequestResponseType.

... and an investigation revealed an open and issue in the Axios repo with no fix.

But, after an inspection of the code, I got it working.

Daishi, thank you!

@dai-shi
Copy link
Author

dai-shi commented Jul 20, 2019

Oh, really. The axios README is so confusing...
You need to use fetch then. Yeah, I remember the stream api is one of the benefits of using fetch over XHR.
Good luck!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment