Skip to content

Instantly share code, notes, and snippets.

@tonitrnel
Created February 16, 2025 11:59
Show Gist options
  • Save tonitrnel/7b235633545959c7ee2b1f8618ca4db5 to your computer and use it in GitHub Desktop.
Save tonitrnel/7b235633545959c7ee2b1f8618ca4db5 to your computer and use it in GitHub Desktop.
从 stream 中读取事件数据包
export class EventStream extends TransformStream<
Uint8Array,
[eventCode: EventCode, data: Uint8Array]
> {
private legacy?: Uint8Array;
private head?: [eventCode: EventCode, dataLength: number, checksum: number];
constructor() {
super({
transform: (chunk, controller) => {
// 将残留数据与本次数据合并
let buffer: Uint8Array = this.legacy
? new Uint8Array(this.legacy.length + chunk.length)
: chunk;
if (this.legacy) {
buffer.set(this.legacy, 0);
buffer.set(chunk, this.legacy.length);
this.legacy = undefined;
}
while (buffer.length > 0) {
// 如果之前已经解析过头部,则直接等待补全事件体数据
if (this.head) {
const [eventCode, dataLength, checksum] = this.head;
if (buffer.length < dataLength) {
// 数据不足,保存当前 buffer 作为残留数据
this.legacy = buffer;
break;
}
// 完整的事件体数据已到齐,截取并发出
const result = buffer.subarray(0, dataLength);
if (crc32(result) !== checksum) {
controller.error(`Checksum mismatch for ${EventCode[eventCode]}`);
} else {
controller.enqueue([eventCode, result]);
}
buffer = buffer.subarray(dataLength);
this.head = undefined;
continue;
}
// 没有等待的头部,至少需要 12 字节的头部数据
if (buffer.length < 12) {
this.legacy = buffer;
break;
}
const head = buffer.subarray(0, 12);
if ([0x10, 0xaa, 0xff].some((v, i) => head[i] !== v)) {
controller.error('Invalid packet input');
return void 0;
}
const view = new DataView(head.buffer, 3, 12);
const eventCode = view.getUint8(0);
const dataLength = view.getUint32(1);
const checksum = view.getUint32(5);
if (buffer.length < dataLength + 12) {
this.head = [eventCode, dataLength, checksum];
this.legacy = buffer.subarray(12);
break;
} else {
const result = buffer.subarray(12, 12 + dataLength);
if (crc32(result) !== checksum) {
controller.error(`Checksum mismatch for ${EventCode[eventCode]}`);
} else {
controller.enqueue([eventCode, result]);
}
buffer = buffer.subarray(12 + dataLength);
}
}
},
flush: (controller) => {
if (this.head || (this.legacy && this.legacy.length > 0)) {
controller.error('EOF');
}
},
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment