Skip to content

Instantly share code, notes, and snippets.

@FranckFreiburger
Last active November 2, 2022 07:20
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FranckFreiburger/9af693b0432d7ee85d4e360e524551dc to your computer and use it in GitHub Desktop.
Save FranckFreiburger/9af693b0432d7ee85d4e360e524551dc to your computer and use it in GitHub Desktop.
nodejs readable and writable stream to duplex stream
import { Duplex } from 'stream'
export default class Duplexify extends Duplex {
constructor(writable, readable) {
super({
readableObjectMode: readable.readableObjectMode,
writableObjectMode: writable.writableObjectMode,
readableHighWaterMark: readable.readableHighWaterMark,
writableHighWaterMark: writable.writableHighWaterMark,
});
this._writable = writable;
this._readable = readable;
// doc: The readable.read() method should only be called on Readable streams operating in paused mode.
readable.pause();
this.once('finish', () => writable.end());
// handle end of stream
writable.on('finish', () => void this.end());
readable.on('end', () => void this.push(null));
// forward errors (unlike pipe())
writable.on('error', err => void this.emit('error', err));
readable.on('error', err => void this.emit('error', err));
}
_destroy(err) {
this._writable.destroy(err);
this._readable.destroy(err);
}
// stream_writable src: https://github.com/nodejs/node/blob/018c3e8949e925efc8077801d44c2b2feb974750/lib/_stream_writable.js#L269
// src doc: Implement an async ._write(chunk, encoding, cb), and it'll handle all the drain event emission and buffering.
_write(chunk, encoding, callback) {
const ok = this._writable.write(chunk, encoding, () => ok && callback());
if ( !ok )
this._writable.once('drain', callback);
}
// stream_readable src: https://github.com/nodejs/node/blob/master/lib/_stream_readable.js#L378
_read(size) {
const chunk = this._readable.read(size);
if ( chunk !== null )
this.push(chunk);
else
this._readable.once('readable', () => void this._read());
}
}
@FranckFreiburger
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment