Skip to content

Instantly share code, notes, and snippets.

@simolus3
Created May 23, 2020 08:44
Show Gist options
  • Save simolus3/3c042445dfd12bdd6eff01a2f68aaeed to your computer and use it in GitHub Desktop.
Save simolus3/3c042445dfd12bdd6eff01a2f68aaeed to your computer and use it in GitHub Desktop.
import 'dart:math';
import 'dart:async';
import 'dart:typed_data';
const _int32Length = 32 ~/ 8;
final _encodeInts = ByteData(_int32Length);
extension ReadLengthPrefixed on Stream<List<int>> {
// Note: Assumes that length bytes aren't splitted across multiple packets
Stream<Uint8List> transformLengthPrefixed() async* {
Uint8List currentChunk;
int currentChunkUsed;
await for (final element in this) {
var offsetInElement = 0;
// go through all bytes in the current message
while (offsetInElement < element.length) {
// If there's no chunk, we're expecting a new one, which starts with the length
if (currentChunk == null) {
// Extract length from the first 4 bytes
_encodeInts.buffer.asUint8List().setAll(0,
element.sublist(offsetInElement, offsetInElement + _int32Length));
final length = _encodeInts.getUint32(0);
currentChunk = Uint8List(length);
currentChunkUsed = 0;
// Account for the bytes consumed to read the message length
offsetInElement += _int32Length;
if (offsetInElement >= element.length) {
break; // Done with the current message
}
}
// Try to fill current chunk
final available = min(
currentChunk.length - currentChunkUsed,
element.length - offsetInElement,
);
currentChunk.setAll(currentChunkUsed,
element.sublist(offsetInElement, offsetInElement + available));
currentChunkUsed += available;
offsetInElement += available;
if (currentChunkUsed == currentChunk.length) {
yield currentChunk;
currentChunk = null;
currentChunkUsed = null;
}
}
}
}
}
extension SendLengthPrefixed on StreamSink<List<int>> {
void addWithLength(List<int> data) {
_encodeInts.setUint32(0, data.length);
add(_encodeInts.buffer.asUint8List());
add(data);
}
}
void main() {
Stream<List<int>> entries = Stream.fromIterable([
[
0, 0, 0, 0, // first message (empty)
0, 0, 0, 1, 64, // second message ([64])
0, 0, 0, 3 // length part of the third message (length = 3)
],
[1, 2, 3], // content of the third message
]);
entries.transformLengthPrefixed().forEach(print);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment