Skip to content

Instantly share code, notes, and snippets.

@bewt85
Last active September 11, 2017 09:55
Show Gist options
  • Save bewt85/9122f00ee19d31e7d84f65c5446b8bfb to your computer and use it in GitHub Desktop.
Save bewt85/9122f00ee19d31e7d84f65c5446b8bfb to your computer and use it in GitHub Desktop.
const { Readable } = require('stream');
const logger = require("debug");
const _ = require("lodash");
const whenHttpRequestComplete = new Promise(resolve => setTimeout(resolve, _.random(100, 1000)));
class SlowNumberSource extends Readable {
constructor(options={}) {
super(options);
this.getNext = this.buildNext(1);
}
buildNext(i) {
return new Promise(resolve => {
setTimeout(() => {
if (i > 5) resolve([i, null]);
else resolve([`${i}\n`, this.buildNext(i+1)]);
}, 200)
});
}
_read() {
this.getNext
.then(([data, next]) => {
if (next !== null) {
this.getNext = next;
this.push(data);
} else {
this.push(null);
}
})
.catch(logger("error"))
}
}
const es = require("event-stream");
const { Writable } = require('stream');
class LineFormatter {
constructor() {
this.lineNumber = 0;
this.mapper = es.map((data, cb) => {
this.lineNumber += 1;
if (data === "") cb();
cb(null, `Line ${this.lineNumber}: ${data}\n`);
})
}
}
class LineAdder extends Writable {
constructor(options={}) {
options.objectMode = true;
super(options);
this._total = 0;
this.total = new Promise(resolve => {
this.on("finish", () => resolve(this._total))
})
}
_write(line, enc, cb) {
try {
this._total += Number(line.trim());
cb(null);
} catch (err) {
cb(err);
}
}
}
const inputStream = (new SlowNumberSource())
.pipe(es.split())
const formatter = new LineFormatter();
const adder = new LineAdder();
inputStream
.pipe(formatter.mapper)
.pipe(process.stdout);
whenHttpRequestComplete
.then(() => inputStream.pipe(adder).total)
.then(total => console.log(`Total: ${total}`))
$ node example_stream_bug.js
Line 1: 1
Line 2: 2
Line 3: 3
Line 4: 4
Line 5: 5
Total: 14
$ node example_stream_bug.js
Line 1: 1
Line 2: 2
Line 3: 3
Line 4: 4
Line 5: 5
Total: 15
$ node example_stream_bug.js
Line 1: 1
Line 2: 2
Line 3: 3
Line 4: 4
Line 5: 5
Total: 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment