Skip to content

Instantly share code, notes, and snippets.

@breezewish
Last active January 9, 2024 04:27
Show Gist options
  • Save breezewish/f061a60f2b0dfacca05d71688d532026 to your computer and use it in GitHub Desktop.
Save breezewish/f061a60f2b0dfacca05d71688d532026 to your computer and use it in GitHub Desktop.
Protobuf.js 6+ Stream Decoder
import through2 from 'through2';
import { BufferReader } from 'protobufjs';
function decodeProtobuf(decodeFunc, msgMaxSize = 20 * 1000 * 1000) {
const buffer = Buffer.alloc(msgMaxSize);
const reader = new BufferReader(buffer);
let bufferLen = 0; // The length of valid data in buffer, may contain multiple messages
reader.pos = 0;
let hasError = false;
return through2.obj(function (chunk, enc, callback) {
if (hasError) {
return callback();
}
chunk.copy(buffer, bufferLen);
bufferLen += chunk.length;
while (reader.pos < bufferLen) {
// Backup read position in case of decode failures.
const pos = reader.pos;
let messageLen = 0;
try {
// Extend reader's view to the whole buffer for decoding prefix.
reader.len = bufferLen;
messageLen = reader.uint32();
} catch (e) {
// Decode length prefix failed, we need to wait for more data.
reader.pos = pos;
break;
}
if (reader.pos + messageLen > reader.len) {
// No enough data. Rewind & try next time.
reader.pos = pos;
break;
}
try {
// Shrink reader's view to the current message, otherwise it will try to decode remaining buffer.
reader.len = reader.pos + messageLen;
const decoded = decodeFunc(reader);
this.push(decoded);
} catch (e) {
// Decode message body failed
hasError = true;
return callback(e);
}
}
// If some data is successfully decoded..
if (reader.pos > 0) {
// If there are remaining data, move to the front.
const len = bufferLen - reader.pos;
if (len > 0) {
buffer.copy(buffer, 0, reader.pos, bufferLen);
}
bufferLen = len;
reader.pos = 0;
}
callback();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment