Skip to content

Instantly share code, notes, and snippets.

@jhurliman
Created March 12, 2021 08:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhurliman/d72dec9c61760f46abf60dc1a93c081b to your computer and use it in GitHub Desktop.
Save jhurliman/d72dec9c61760f46abf60dc1a93c081b to your computer and use it in GitHub Desktop.
RosTcpMessageStream.ts
import { TransformStream, TransformerTransformCallback } from "web-streams-polyfill/ponyfill";
class TcpMessageStreamImpl {
private _inMessage = false;
private _bytesNeeded = 4;
private _chunks: Uint8Array[] = [];
transform: TransformerTransformCallback<Uint8Array, Uint8Array> = (chunk, controller) => {
let idx = 0;
while (idx < chunk.length) {
if (chunk.length - idx < this._bytesNeeded) {
// If we didn't receive enough bytes to complete the current message or
// message length field, store this chunk and continue on
this._chunks.push(new Uint8Array(chunk, idx));
this._bytesNeeded -= chunk.length - idx;
return;
}
// Store the final chunk needed to complete the current message or message
// length field
this._chunks.push(new Uint8Array(chunk, idx, this._bytesNeeded));
idx += this._bytesNeeded;
const payload = TcpMessageStreamImpl.ConcatData(this._chunks);
if (this._inMessage) {
// Produce a Uint8Array representing a single message and transition to
// reading a message length field
this._inMessage = false;
this._bytesNeeded = 4;
controller.enqueue(payload);
} else {
// Decoded the message length field and transition to reading a message
this._inMessage = true;
this._bytesNeeded = new DataView(payload).getUint32(0, true);
}
}
};
static ConcatData(chunks: Uint8Array[]): Uint8Array {
if (chunks.length === 1) {
return chunks[0];
}
const totalLength = chunks.reduce((len, chunk) => len + chunk.length, 0);
if (totalLength === 0) {
return new Uint8Array();
}
const result = new Uint8Array(totalLength);
let idx = 0;
chunks.forEach((chunk) => {
result.set(chunk, idx);
idx += chunk.length;
});
return result;
}
}
export class TcpMessageStream extends TransformStream<Uint8Array, Uint8Array> {
constructor() {
super(new TcpMessageStreamImpl());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment