Skip to content

Instantly share code, notes, and snippets.

@loopy321
Created June 15, 2021 16:04
Show Gist options
  • Save loopy321/9636b85605dfce2250c7660f3f7d5452 to your computer and use it in GitHub Desktop.
Save loopy321/9636b85605dfce2250c7660f3f7d5452 to your computer and use it in GitHub Desktop.
diff --git a/controller/comms/Comms.ts b/../../pool2/nodejs-poolController/controller/comms/Comms.ts
index 38289b8..1f8a77e 100755
--- a/controller/comms/Comms.ts
+++ b/../../pool2/nodejs-poolController/controller/comms/Comms.ts
@@ -22,7 +22,7 @@ import { logger } from '../../logger/Logger';
import * as net from 'net';
import { setTimeout, setInterval } from 'timers';
import { Message, Outbound, Inbound, Response } from './messages/Messages';
-import { OutboundMessageError } from '../Errors';
+import { MessageError, OutboundMessageError } from '../Errors';
const extend = require("extend");
export class Connection {
constructor() {
@@ -41,6 +41,7 @@ export class Connection {
}
public isRTS: boolean = true;
public emitter: EventEmitter;
+ public get enabled(): boolean { return typeof this._cfg !== 'undefined' && this._cfg.enabled; }
public async openAsync(): Promise<boolean> {
if (typeof (this.buffer) === 'undefined') {
this.buffer = new SendRecieveBuffer();
@@ -130,7 +131,7 @@ export class Connection {
// for a successul connect and false otherwise.
sp.on('open', () => {
if (typeof conn._port !== 'undefined') logger.info(`Serial Port: ${this._cfg.rs485Port} recovered from lost connection.`)
- else logger.info(`Serial port: ${this._cfg.rs485Port} request to open succeeded without error`);
+ else logger.info(`Serial port: ${this._cfg.rs485Port} request to open successful`);
this._port = sp;
this.isOpen = true;
sp.on('data', (data) => { if (!this.mockPort && !this.isPaused) this.emitter.emit('packetread', data); this.resetConnTimer(); });
@@ -149,88 +150,8 @@ export class Connection {
});
}
}
- //public open(timeOut?: string) {
- // if (conn._cfg.netConnect && !conn._cfg.mockPort) {
- // if (typeof conn._port !== 'undefined' !&& conn._port.destroyed) conn._port.destroy();
- // let nc = conn._port = new net.Socket();
- // nc.connect(conn._cfg.netPort, conn._cfg.netHost, function () {
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout')
- // logger.warn('Net connect (socat) trying to recover from lost connection.');
- // });
- // nc.on('data', function (data) {
- // conn.isOpen = true;
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout') {
- // logger.info(`Net connect (socat) connected to: ${conn._cfg.netHost}:${conn._cfg.netPort}`);
- // timeOut = undefined;
- // }
- // if (data.length > 0 && !conn.isPaused) conn.emitter.emit('packetread', data);
- // conn.resetConnTimer('timeout');
- // });
- // }
- // else {
- // var sp: SerialPort = null;
- // if (conn._cfg.mockPort) {
- // this.mockPort = true;
- // SerialPort.Binding = MockBinding;
- // let portPath = 'FAKE_PORT';
- // MockBinding.createPort(portPath, { echo: false, record: true });
- // sp = new SerialPort(portPath, { autoOpen: false });
- // }
- // else {
- // this.mockPort = false;
- // sp = new SerialPort(conn._cfg.rs485Port, conn._cfg.portSettings);
- // }
-
- // conn._port = sp;
- // sp.open(function(err) {
- // if (err) {
- // conn.resetConnTimer();
- // conn.isOpen = false;
- // logger.error(`Error opening port: ${err.message}. ${conn._cfg.inactivityRetry > 0 ? `Retry in ${conn._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${conn._cfg.inactivityRetry}`}`);
- // }
- // else
- // logger.info(`Serial port: ${ this.path } request to open succeeded without error`);
- // });
- // sp.on('open', function() {
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout')
- // logger.error('Serial port %s recovering from lost connection', conn._cfg.rs485Port);
- // else
- // logger.info(`Serial port: ${ this.path } opened`);
- // conn.isOpen = true;
- // });
- // // RKS: 06-16-20 -- Unsure why we are using a stream event here. The data
- // // is being sent via the data event and I can find no reference to the readable event.
- // sp.on('data', function (data) {
- // if (!this.mockPort) {
- // if (!conn.isPaused) conn.emitter.emit('packetread', data);
- // }
- // conn.resetConnTimer();
- // });
- // //sp.on('readable', function () {
- // // if (!this.mockPort) {
- // // // If we are paused just read the port and do nothing with it.
- // // if (conn.isPaused)
- // // sp.read();
- // // else
- // // conn.emitter.emit('packetread', sp.read());
- // // conn.resetConnTimer();
- // // }
- // //});
-
- // }
- // if (typeof (conn.buffer) === 'undefined') {
- // conn.buffer = new SendRecieveBuffer();
- // conn.emitter.on('packetread', function(pkt) { conn.buffer.pushIn(pkt); });
- // conn.emitter.on('messagewrite', function(msg) { conn.buffer.pushOut(msg); });
- // }
- // conn.resetConnTimer('retry_timeout');
- // conn._port.on('error', function(err) {
- // logger.error(`Error opening port: ${err.message}. ${conn._cfg.inactivityRetry > 0 ? `Retry in ${conn._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${conn._cfg.inactivityRetry}`}`);
- // conn.resetConnTimer();
- // conn.isOpen = false;
- // });
- //}
- public close() {
+ public closeAsync() {
+ try {
if (conn.connTimer) clearTimeout(conn.connTimer);
if (typeof (conn._port) !== 'undefined' && conn._cfg.netConnect) {
if (typeof (conn._port.destroy) !== 'function')
@@ -240,7 +161,8 @@ export class Connection {
else
conn._port.destroy();
}
- conn.buffer.close();
+ if (typeof conn.buffer !== 'undefined') conn.buffer.close();
+ } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); }
}
public drain(cb: Function) {
if (typeof (conn._port.drain) === 'function')
@@ -258,9 +180,10 @@ export class Connection {
conn._port.write(bytes, cb);
}
public async stopAsync() {
- Promise.resolve()
- .then(function() { conn.close(); })
- .then(function() { console.log('closed connection'); });
+ try {
+ await conn.closeAsync();
+ logger.info(`Closed serial communications connection.`);
+ } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); }
}
public init() {
conn._cfg = config.getSection('controller.comms', {
@@ -272,7 +195,7 @@ export class Connection {
netPort: 9801,
inactivityRetry: 10
});
- conn.openAsync();
+ if (conn._cfg.enabled) conn.openAsync();
config.emitter.on('reloaded', () => {
console.log('Config reloaded');
this.reloadConfig(config.getSection('controller.comms', {
@@ -297,9 +220,9 @@ export class Connection {
inactivityRetry: 10
}, cfg);
if (JSON.stringify(c) !== JSON.stringify(this._cfg)) {
- this.close();
+ this.closeAsync();
this._cfg = c;
- this.openAsync();
+ if (this._cfg.enabled) this.openAsync();
}
}
public queueSendMessage(msg: Outbound) { conn.emitter.emit('messagewrite', msg); }
@@ -328,7 +251,8 @@ export class SendRecieveBuffer {
private _waitingPacket: Outbound;
private _msg: Inbound;
public pushIn(pkt) {
- conn.buffer._inBuffer.push.apply(conn.buffer._inBuffer, pkt.toJSON().data); setTimeout(() => { this.processPackets(); }, 0); }
+ conn.buffer._inBuffer.push.apply(conn.buffer._inBuffer, pkt.toJSON().data); setTimeout(() => { this.processPackets(); }, 0);
+ }
public pushOut(msg) { conn.buffer._outBuffer.push(msg); setTimeout(() => { this.processPackets(); }, 0); }
public clear() { conn.buffer._inBuffer.length = 0; conn.buffer._outBuffer.length = 0; }
public close() { clearTimeout(conn.buffer.procTimer); conn.buffer.clear(); this._msg = undefined; }
@@ -372,18 +296,32 @@ export class SendRecieveBuffer {
return false;
}
protected processOutbound() {
- if (conn.isOpen && conn.isRTS) {
+ let msg: Outbound;
if (!conn.buffer.processWaitPacket() && conn.buffer._outBuffer.length > 0) {
+ if (conn.isOpen) {
+ if (conn.isRTS) {
+ msg = conn.buffer._outBuffer.shift();
+ if (typeof msg === 'undefined' || !msg) return;
// If the serial port is busy we don't want to process any outbound. However, this used to
// not process the outbound even when the incoming bytes didn't mean anything. Now we only delay
// the outbound when we actually have a message signatures to process.
- var msg: Outbound = conn.buffer._outBuffer.shift();
- if (typeof msg === 'undefined' || !msg) return;
conn.buffer.writeMessage(msg);
}
}
+ else {
+ // port is closed, reject message
+ msg = conn.buffer._outBuffer.shift();
+ msg.failed = true;
+ logger.warn(`Comms port is not open. Message aborted: ${msg.toShortPacket()}`);
+ // This is a hard fail. We don't have any more tries left and the message didn't
+ // make it onto the wire.
+ let error = new OutboundMessageError(msg, `Comms port is not open. Message aborted: ${msg.toShortPacket()}`);
+ if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined);
+ conn.buffer._waitingPacket = null;
+ }
+ }
// RG: added the last `|| typeof msg !== 'undef'` because virtual chem controller only sends a single packet
- // but this condition would be eval'd before the callback of conn.write was calles and the outbound packet
+ // but this condition would be eval'd before the callback of conn.write was calls and the outbound packet
// would be sitting idle for eternity.
if (conn.buffer._outBuffer.length > 0 || typeof conn.buffer._waitingPacket !== 'undefined' || conn.buffer._waitingPacket || typeof msg !== 'undefined') {
// Come back later as we still have items to send.
@@ -414,7 +352,8 @@ export class SendRecieveBuffer {
// we have an RTS semaphore and a waiting response might make it go here.
msg.failed = true;
conn.buffer._waitingPacket = null;
- logger.warn(`Message aborted after ${ msg.tries } attempt(s): ${ msg.toShortPacket() }`);
+ if (typeof msg.onAbort === 'function') msg.onAbort();
+ else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()}`);
let err = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()}`);
if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined);
if (msg.requiresResponse) {
@@ -471,7 +410,6 @@ export class SendRecieveBuffer {
}
private clearResponses(msgIn: Inbound) {
if (conn.buffer._outBuffer.length === 0 && typeof (conn.buffer._waitingPacket) !== 'object' && conn.buffer._waitingPacket) return;
-
var callback;
let msgOut = conn.buffer._waitingPacket;
if (typeof (conn.buffer._waitingPacket) !== 'undefined' && conn.buffer._waitingPacket) {
@@ -503,12 +441,12 @@ export class SendRecieveBuffer {
if (typeof out === 'undefined') continue;
let resp = out.response;
if (out.requiresResponse) {
- if (resp instanceof Response && resp.isResponse(msgIn, out)) {
+ if (resp instanceof Response && resp.isResponse(msgIn, out) && (typeof out.scope === 'undefined' || out.scope === msgOut.scope)) {
resp.message = msgIn;
if (typeof (resp.callback) === 'function' && resp.callback) callback = resp.callback;
conn.buffer._outBuffer.splice(i, 1);
}
- else if (typeof resp === 'function' && resp(msgIn, out)) {
+ else if (typeof resp === 'function' && resp(msgIn, out) && (typeof out.scope === 'undefined' || out.scope === msgOut.scope)) {
if (typeof out.onResponseProcessed !== 'undefined') callback = out.onResponseProcessed;
conn.buffer._outBuffer.splice(i, 1);
}
@@ -526,11 +464,13 @@ export class SendRecieveBuffer {
conn.buffer.counter.collisions += msg.collisions;
if (msg.isValid) {
conn.buffer.counter.success++;
+ conn.buffer.counter.updatefailureRate();
msg.process();
conn.buffer.clearResponses(msg);
}
else {
conn.buffer.counter.failed++;
+ conn.buffer.counter.updatefailureRate();
console.log('RS485 Stats:' + JSON.stringify(conn.buffer.counter));
ndx = this.rewindFailedMessage(msg, ndx);
}
@@ -582,11 +522,16 @@ export class Counter {
this.failed = 0;
this.bytesSent = 0;
this.collisions = 0;
+ this.failureRate = '0.00%';
}
public bytesReceived: number;
public success: number;
public failed: number;
public bytesSent: number;
public collisions: number;
+ public failureRate: string;
+ public updatefailureRate():void {
+ conn.buffer.counter.failureRate = `${(conn.buffer.counter.failed / (conn.buffer.counter.failed + conn.buffer.counter.success) * 100).toFixed(2)}%`;
+ }
}
export var conn: Connection = new Connection();
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment