Skip to content

Instantly share code, notes, and snippets.

@jclab-joseph
Last active September 15, 2022 00:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jclab-joseph/d4258a4f8c2d19c15ccc8dfb27540f16 to your computer and use it in GitHub Desktop.
Save jclab-joseph/d4258a4f8c2d19c15ccc8dfb27540f16 to your computer and use it in GitHub Desktop.
node stream backpressure test
const streams = require('stream');
const StreamLimiter = require('throttle');
const uuid = require('uuid');
class TestDuplex extends streams.Duplex {
nextWrite = null
constructor() {
super({
highWaterMark: 128
});
}
_read(size) {
if (this.nextWrite) {
this.nextWrite();
this.nextWrite = null;
}
}
_write(chunk, encoding, callback) {
const ret = this.push(chunk);
if (ret) {
callback();
} else {
console.log('internal buffer is full');
this.nextWrite = () => {
callback();
}
}
}
}
const testDuplex = new TestDuplex();
const deque = [];
const doit = () => {
const a = Buffer.from(uuid.v4());
deque.push(...a);
testDuplex.write(a, () => {
setImmediate(() => doit());
});
}
doit();
testDuplex
.pipe(new StreamLimiter(128 * 8))
.on('data', (data) => {
for (let i=0; i<data.length; i++) {
const a = data[i];
const b = deque.shift();
if (a !== b) {
throw new Error(`WRONG DATA ${a} !== ${b}`);
}
}
});
@jclab-joseph
Copy link
Author

WARNING

DO NOT WRITE AGAIN BEFORE WRITE IS DONE.

BELOW IS NOT WORKING. (this.nextWrite is overwritten.)

const doit = () => {
  let count = 0;

  for (let i=0; i<2; i++) {
    const a = Buffer.from(uuid.v4());
    deque.push(...a);
    testDuplex.write(a, () => {
      count++;
      if (count === 2) {
        setImmediate(() => doit());
      }
    });
  }
}
setImmediate(() => doit());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment