Skip to content

Instantly share code, notes, and snippets.

@webstory
Last active December 14, 2018 01:41
Show Gist options
  • Save webstory/ad07a0c2a7f315576f55e82a454ae67e to your computer and use it in GitHub Desktop.
Save webstory/ad07a0c2a7f315576f55e82a454ae67e to your computer and use it in GitHub Desktop.
Nodejs 8 object stream
const stream = require('stream');
class Producer extends stream.Readable {
constructor(options) {
options = Object.assign({
highWaterMark: 5,
objectMode: true
}, options);
super(options);
this.max = 50;
this.index = 1;
}
_read(size) {
const i = this.index++;
if (i > this.max) {
this.push(null);
} else {
this.push(i);
console.log(`Pushed ${i}`);
}
}
}
class Consumer extends stream.Writable {
constructor(options) {
options = Object.assign({
highWaterMark: 5,
objectMode: true
}, options);
super(options);
this.name = options.name || 'Consumer';
}
_write(chunk, encoding, callback) {
setTimeout(() => {
console.log(`[${this.name}] ${chunk}`);
callback();
}, 100);
}
_final(callback) {
callback();
}
}
class Transformer extends stream.Transform {
constructor(options) {
options = Object.assign({
highWaterMark: 5,
objectMode: true
}, options);
super(options);
this._fn = options.func || ((a) => a);
console.log(options);
}
_transform(chunk, encoding, callback) {
this.push(this._fn(chunk));
callback();
}
_flush(callback) {
callback();
}
}
class Mux extends stream.PassThrough {
constructor(options) {
options = Object.assign({
highWaterMark: 1,
objectMode: true
});
super(options);
}
}
const producer = new Producer();
const t1 = new Transformer({ func: (a) => a *10 });
const consumer1 = new Consumer({ name: 'C1' });
const consumer2 = new Consumer({ name: 'C2' });
const mux = new Mux();
producer
.pipe(t1)
.pipe(mux) // Comment if not using multiplexer
.pipe(consumer1);
mux.pipe(consumer2);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment