Skip to content

Instantly share code, notes, and snippets.

@rdb
Last active January 9, 2022 11:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rdb/f2fa274b839248984b7d6ee330eac09b to your computer and use it in GitHub Desktop.
Save rdb/f2fa274b839248984b7d6ee330eac09b to your computer and use it in GitHub Desktop.
Minimal Node.js-based PStats server implementation
const MSGTYPE_DATAGRAM = 0;
const MSGTYPE_HELLO = 1;
const MSGTYPE_DEFINE_COLLECTORS = 2;
const MSGTYPE_DEFINE_THREADS = 3;
/**
* Contains a single collector definition as sent by the client.
*/
class CollectorDef {
constructor() {
this.name = "";
this.parent = null;
this.suggestedColor = [0, 0, 0];
this.sort = -1;
this.levelUnits = "";
this.suggestedScale = 0.0;
this.factor = 1.0;
}
get fullname() {
if (this.parent) {
return this.parent.fullname + ':' + this.name;
} else {
return this.name;
}
}
}
/**
* Contains all the state specific to a particular client.
*/
class Client {
constructor() {
this.collectors = [];
this.threadNames = [];
}
getCollector(index) {
let collector = this.collectors[index];
if (!collector) {
collector = new CollectorDef();
this.collectors[index] = collector;
}
return collector;
}
/**
* Reads a control message. Takes a DatagramIterator.
*/
read(source) {
const type = source.getUint8();
switch (type) {
case MSGTYPE_DATAGRAM:
this.readDatagram(source);
break;
case MSGTYPE_HELLO:
this.readHello(source);
break;
case MSGTYPE_DEFINE_COLLECTORS:
this.readDefineCollectors(source);
break;
case MSGTYPE_DEFINE_THREADS:
this.readDefineThreads(source);
break;
default:
console.log("Received unknown message type", type, "from client", this.hostname);
}
}
/**
* Reads a frame data message from TCP or UDP.
*/
readDatagram(source) {
const threadIndex = source.getUint16();
const frameNumber = source.getUint32();
const data = [];
// Read the time collector data, a series of start and stop times.
const timeSize = source.getUint16();
for (let i = 0; i < timeSize; ++i) {
const index = source.getUint16();
const value = source.getFloat32();
const collector = this.getCollector(index & 0x7fff);
let collectorData = data[index & 0x7fff];
if (!collectorData) {
collectorData = {started: 0, nstarted: 0, time: 0, name: collector.fullname};
data[index & 0x7fff] = collectorData;
}
if ((index & 0x8000) === 0) {
if (collectorData.nstarted++ == 0) {
collectorData.started = value;
//console.log(`Start ${collector.fullname} at ${value}`);
}
} else {
if (--collectorData.nstarted == 0) {
collectorData.time += (value - collectorData.started);
//console.log(`Stop ${collector.fullname} at ${value}`);
}
}
}
// Display 6 slowest collectors.
const str = `Slowest 6 collectors of frame ${frameNumber} on thread ${this.threadNames[threadIndex]}:`;
console.log(`\n${str}\n${'-'.repeat(str.length)}`);
data.sort((a, b) => (a.time < b.time) - (a.time > b.time));
for (let i = 0; i < 6 && data[i]; ++i) {
console.log(data[i].name, "took", (data[i].time * 1000).toFixed(1), "ms");
}
// Now read the level collectors, which display a non-time quantity.
const levelSize = source.getUint16();
for (let i = 0; i < levelSize; ++i) {
const index = source.getUint16();
const value = source.getFloat32()
const collector = this.getCollector(index);
//console.log(`Level collector ${collector.fullname} has value ${value} ${collector.levelUnits}`);
}
}
/**
* Reads a hello message identifying the client from a TCP datagram.
*/
readHello(source) {
this.hostname = source.getString();
this.progname = source.getString();
if (source.getRemainingSize() == 0) {
this.majorVersion = 1;
this.minorVersion = 1;
} else {
this.majorVersion = source.getUint16();
this.minorVersion = source.getUint16();
}
console.log(`${this.progname} version ${this.majorVersion}.${this.minorVersion} connected from ${this.hostname}`);
}
/**
* Reads a series of collector definitions from a TCP datagram.
*/
readDefineCollectors(source) {
const num = source.getUint16();
for (let i = 0; i < num; ++i) {
const index = source.getInt16();
const collector = this.getCollector(index);
collector.name = source.getString();
const parentIndex = source.getInt16();
collector.parent = (parentIndex != index) ? this.getCollector(parentIndex) : null;
collector.suggestedColor[0] = source.getFloat32();
collector.suggestedColor[1] = source.getFloat32();
collector.suggestedColor[2] = source.getFloat32();
collector.sort = source.getInt16();
collector.levelUnits = source.getString();
collector.suggestedScale = source.getFloat32();
collector.factor = source.getFloat32();
this.collectors[index] = collector;
//console.log(`got collector ${index}: ${collector.name}`);
}
}
/**
* Reads a series of thread definitions from a TCP datagram.
*/
readDefineThreads(source) {
const firstThreadIndex = source.getUint16();
const num = source.getUint16();
for (let i = firstThreadIndex; i < firstThreadIndex + num; ++i) {
this.threadNames[i] = source.getString();
}
}
};
module.exports = {Client};
/**
* Convenience classes for encoding and decoding binary data.
*/
class Datagram {
constructor(data) {
this.data = Buffer.alloc(0);
}
clear() {
this.data = Buffer.alloc(0);
}
addBool(value) {
this.addUint8(value ? 1 : 0);
}
addInt8(value) {
const buf = Buffer.allocUnsafe(1);
buf.writeInt8(value);
this.data = Buffer.concat([this.data, buf]);
}
addUint8(value) {
const buf = Buffer.allocUnsafe(1);
buf.writeUInt8(value);
this.data = Buffer.concat([this.data, buf]);
}
addInt16(value) {
const buf = Buffer.allocUnsafe(2);
buf.writeInt16LE(value);
this.data = Buffer.concat([this.data, buf]);
}
addUint16(value) {
const buf = Buffer.allocUnsafe(2);
buf.writeUInt16LE(value);
this.data = Buffer.concat([this.data, buf]);
}
addInt32(value) {
const buf = Buffer.allocUnsafe(4);
buf.writeInt32LE(value);
this.data = Buffer.concat([this.data, buf]);
}
addUint32(value) {
const buf = Buffer.allocUnsafe(4);
buf.writeUInt32LE(value);
this.data = Buffer.concat([this.data, buf]);
}
addString(value, encoding = 'utf8') {
const str = Buffer.from(value, encoding);
const buf = Buffer.allocUnsafe(2);
buf.writeUInt16LE(str.length);
this.data = Buffer.concat([this.data, buf, str]);
}
get length() {
return this.data.length;
}
};
class DatagramIterator {
constructor(data, offset = 0) {
this.data = data.data || data;
this.offset = 0;
}
getBool() {
const value = this.data.readUInt8(this.offset);
this.offset += 1;
return value != 0;
}
getInt8() {
const value = this.data.readInt8(this.offset);
this.offset += 1;
return value;
}
getUint8() {
const value = this.data.readUInt8(this.offset);
this.offset += 1;
return value;
}
getInt16() {
const value = this.data.readInt16LE(this.offset);
this.offset += 2;
return value;
}
getUint16() {
const value = this.data.readUInt16LE(this.offset);
this.offset += 2;
return value;
}
getInt32() {
const value = this.data.readInt32LE(this.offset);
this.offset += 4;
return value;
}
getUint32() {
const value = this.data.readUInt32LE(this.offset);
this.offset += 4;
return value;
}
getFloat32() {
const value = this.data.readFloatLE(this.offset);
this.offset += 4;
return value;
}
getString(encoding = 'utf8') {
const length = this.getUint16();
const value = this.data.toString(encoding, this.offset, this.offset + length);
this.offset += length;
return value;
}
getRemainingSize() {
return this.data.length - this.offset;
}
}
module.exports = {Datagram, DatagramIterator};
const os = require('os');
const net = require('net');
const dgram = require('dgram');
const { Client } = require('./client.js');
const { Datagram, DatagramIterator } = require('./datagram.js');
const server = net.createServer(socket => {
const client = new Client();
// Listen for control messages on the TCP socket.
let buffer = Buffer.alloc(0);
socket.on('data', data => {
// Concatenate newly received data into the buffer
buffer = Buffer.concat([buffer, data])
if (buffer.length < 4) {
return;
}
// Read all the datagrams that have been received in their entirety.
let length = buffer.readUInt32LE();
while (buffer.length >= length + 4) {
if (length > 0) {
const source = new DatagramIterator(buffer.slice(4, 4 + length));
client.read(source)
}
buffer = buffer.slice(4 + length);
if (buffer.length < 4) {
break;
}
length = buffer.readUInt32LE();
}
});
// Also open a UDP socket on any available port.
let udpSocket = dgram.createSocket(socket.address().family == 'IPv6' ? 'udp6' : 'udp4');
udpSocket.on('listening', () => {
// Compose a hello message to the client with our UDP port.
const udpPort = udpSocket.address().port;
const hello = new Datagram();
hello.addUint8(1); // type
hello.addString(os.hostname()); // hostname
hello.addString("node-pstats"); // progname
hello.addUint16(udpPort);
// Prefix header and send it down.
const datagram = Buffer.allocUnsafe(4 + hello.length);
datagram.writeUInt32LE(hello.length);
hello.data.copy(datagram, 4);
socket.write(datagram);
});
udpSocket.on('message', data => {
// Verify the checksum.
let checksum1 = data.readUInt16LE();
let checksum2 = 0;
for (let i = 2; i < data.length; ++i) {
checksum2 = (checksum2 + data[i]) & 0xffff;
}
if (checksum1 != checksum2) {
console.log("Dropping bad UDP packet");
return;
}
const source = new DatagramIterator(data.slice(2));
client.read(source);
});
udpSocket.on('close', () => {
udpSocket = null;
console.log("Lost UDP connection to", client.hostname)
});
udpSocket.bind(0);
// If the TCP socket is closed, we close the UDP socket as well.
socket.on('close', () => {
console.log("Lost TCP connection to", client.hostname)
if (udpSocket) {
udpSocket.close();
}
});
});
server.listen(5185);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment