Skip to content

Instantly share code, notes, and snippets.

@sandfox
Created July 6, 2021 21:15
Show Gist options
  • Save sandfox/656a17aa0e241d046ac593fc4ad07d97 to your computer and use it in GitHub Desktop.
Save sandfox/656a17aa0e241d046ac593fc4ad07d97 to your computer and use it in GitHub Desktop.
nodejs newline stream
const { Transform } = require("stream");
// Splits buffers on utf8 newline character
// If the maxLineLength is passed the current line is discarded and all further input is discarded until the next newline char is found
// TODO: emit errors on overlong lines?
class NewlineStreamSplitter extends Transform {
constructor(streamOpts, opts) {
super(streamOpts);
opts = opts || {};
// this default aligns with cloudwatch logs max size for a single event
this._maxLineLengthBytes = opts.maxLineLengthBytes || 256 * 1024;
this._buffers = [];
this._currentSize = 0;
}
_isLimitExceeded() {
return this._currentSize > this._maxLineLengthBytes;
}
_transform(chunk, encoding, callback) {
if (encoding !== "buffer") {
callback(new Error("This only supports buffer mode streams"));
return;
}
let marker = 0;
while (marker < chunk.length - 1) {
// find the newline byte
const idx = chunk.indexOf("\n", marker, "utf8");
// no newline found
if (idx === -1) {
const subChunk = chunk.slice(marker);
this._currentSize += subChunk.length;
// only store the buffer if we aren't over the limit
if (!this._isLimitExceeded()) {
this._buffers.push(subChunk);
}
break;
}
// newline found
const subChunk = chunk.slice(marker, idx);
this._buffers.push(subChunk);
this._currentSize += subChunk.length;
marker = idx + 1;
// if it's within limits - push it downstream, else reset internal state
if (!this._isLimitExceeded()) {
this._flushBuffer();
} else if (this._isLimitExceeded()) {
this._clear();
}
}
callback();
}
_flush(cb) {
if (!this._isLimitExceeded()) {
this._flushBuffer();
} else {
this._clear();
}
cb();
}
// drop all internal state
_clear() {
this._buffers = [];
this._currentSize = 0;
}
_flushBuffer() {
this.push(Buffer.concat(this._buffers));
this._clear();
}
}
module.exports = {
NewlineStreamSplitter,
};
const assert = require("assert");
const { NewlineStreamSplitter } = require("./newline-stream-splitter");
const testSplitter = () => {
const testLine = Buffer.from("some-data", "utf8");
const nl = Buffer.from("\n", "utf8");
const testBuffer = Buffer.concat([testLine, nl, testLine]);
const splitter = new NewlineStreamSplitter(undefined, {
maxLineLengthBytes: 20,
});
const outputChunks = [];
splitter.on("data", (b) => {
outputChunks.push(b);
});
splitter.write(testBuffer);
splitter.end();
outputChunks.forEach((c, i) => {
assert.ok(
Buffer.compare(c, testLine) === 0,
`expected chunk@${i} to equal "some-data" buffer`
);
});
assert.strictEqual(outputChunks.length, 2, "output should have 2 chunks");
};
testSplitter();
const testSkipsOverlengthLines = () => {
const testLine = Buffer.from(
"oh noe I am too long \n i am also too long",
"utf8"
);
const splitter = new NewlineStreamSplitter(undefined, {
maxLineLengthBytes: 4,
});
splitter.on("data", () => {
assert.fail("no buffers should be emitted");
});
splitter.write(testLine);
splitter.end();
};
testSkipsOverlengthLines();
const testSkipsOnlyOverLengthLines = () => {
const testLine = Buffer.from("oh noe I am too long \n----", "utf8");
const splitter = new NewlineStreamSplitter(undefined, {
maxLineLengthBytes: 4,
});
const outputChunks = [];
splitter.on("data", (b) => {
outputChunks.push(b);
});
splitter.write(testLine);
splitter.end();
outputChunks.forEach((c) => {
assert.ok(
Buffer.compare(c, Buffer.from("----", "utf8")) === 0,
`expected buffer to equal "----" buffer`
);
});
assert.strictEqual(outputChunks.length, 1, "output should have 1 chunks");
};
testSkipsOnlyOverLengthLines();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment