Skip to content

Instantly share code, notes, and snippets.

@uzluisf
Last active June 4, 2023 21:12
Show Gist options
  • Save uzluisf/4774bb39a5d2db131393c0b48d43678b to your computer and use it in GitHub Desktop.
Save uzluisf/4774bb39a5d2db131393c0b48d43678b to your computer and use it in GitHub Desktop.
Node.js Transform Stream Example

Node.js Transform Stream Example

The JsonLineParser class implements a transform stream (using Node.js v20.2.0's Stream API) to parse a file that contains JSON objects separated by newlines (See data.ndjson). It's adapted from Young and Harter's Node.js in Practice book, especifically Technique 31: Implementing a readable stream.

{"id":1,"name":"O Brother, Where Art Thou?"}
{"id":2,"name":"Home for the Holidays"}
{"id":3,"name":"The Firm"}
{"id":4,"name":"Broadcast News"}
const fs = require('fs');
const JsonLineParser = require('./parser');
const filename = './data.ndjson';
const readStream = fs.createReadStream(filename, { highWaterMark: 10 });
const parser = new JsonLineParser();
readStream.pipe(parser)
.on('object', (obj) => {
console.log(obj.name)
});
const stream = require('stream');
class JsonLineParser extends stream.Transform {
constructor(options) {
options = options || {};
options.encoding = "utf8";
super(options);
this._buffer = '';
this._DELIMITER = '\n';
}
_transform(chunk, enc, cb) {
try {
this._processLine(chunk.toString('utf8'));
cb();
}
catch (err) {
cb(err);
}
}
_processLine(chunk) {
this._buffer += chunk;
while (this._buffer.includes(this._DELIMITER)) {
const lineIndex = this._buffer.indexOf(this._DELIMITER);
if (lineIndex !== -1) {
const line = this._buffer.slice(0, lineIndex).trim();
if (line) {
const result = JSON.parse(line);
this._buffer = this._buffer.slice(lineIndex + 1);
this.emit('object', result);
this.push(JSON.stringify(result));
}
else {
this._buffer = this._buffer.slice(1);
}
}
}
}
}
module.exports = JsonLineParser;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment