Created
June 15, 2021 16:04
-
-
Save loopy321/9636b85605dfce2250c7660f3f7d5452 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/controller/comms/Comms.ts b/../../pool2/nodejs-poolController/controller/comms/Comms.ts | |
index 38289b8..1f8a77e 100755 | |
--- a/controller/comms/Comms.ts | |
+++ b/../../pool2/nodejs-poolController/controller/comms/Comms.ts | |
@@ -22,7 +22,7 @@ import { logger } from '../../logger/Logger'; | |
import * as net from 'net'; | |
import { setTimeout, setInterval } from 'timers'; | |
import { Message, Outbound, Inbound, Response } from './messages/Messages'; | |
-import { OutboundMessageError } from '../Errors'; | |
+import { MessageError, OutboundMessageError } from '../Errors'; | |
const extend = require("extend"); | |
export class Connection { | |
constructor() { | |
@@ -41,6 +41,7 @@ export class Connection { | |
} | |
public isRTS: boolean = true; | |
public emitter: EventEmitter; | |
+ public get enabled(): boolean { return typeof this._cfg !== 'undefined' && this._cfg.enabled; } | |
public async openAsync(): Promise<boolean> { | |
if (typeof (this.buffer) === 'undefined') { | |
this.buffer = new SendRecieveBuffer(); | |
@@ -130,7 +131,7 @@ export class Connection { | |
// for a successul connect and false otherwise. | |
sp.on('open', () => { | |
if (typeof conn._port !== 'undefined') logger.info(`Serial Port: ${this._cfg.rs485Port} recovered from lost connection.`) | |
- else logger.info(`Serial port: ${this._cfg.rs485Port} request to open succeeded without error`); | |
+ else logger.info(`Serial port: ${this._cfg.rs485Port} request to open successful`); | |
this._port = sp; | |
this.isOpen = true; | |
sp.on('data', (data) => { if (!this.mockPort && !this.isPaused) this.emitter.emit('packetread', data); this.resetConnTimer(); }); | |
@@ -149,88 +150,8 @@ export class Connection { | |
}); | |
} | |
} | |
- //public open(timeOut?: string) { | |
- // if (conn._cfg.netConnect && !conn._cfg.mockPort) { | |
- // if (typeof conn._port !== 'undefined' !&& conn._port.destroyed) conn._port.destroy(); | |
- // let nc = conn._port = new net.Socket(); | |
- // nc.connect(conn._cfg.netPort, conn._cfg.netHost, function () { | |
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout') | |
- // logger.warn('Net connect (socat) trying to recover from lost connection.'); | |
- // }); | |
- // nc.on('data', function (data) { | |
- // conn.isOpen = true; | |
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout') { | |
- // logger.info(`Net connect (socat) connected to: ${conn._cfg.netHost}:${conn._cfg.netPort}`); | |
- // timeOut = undefined; | |
- // } | |
- // if (data.length > 0 && !conn.isPaused) conn.emitter.emit('packetread', data); | |
- // conn.resetConnTimer('timeout'); | |
- // }); | |
- // } | |
- // else { | |
- // var sp: SerialPort = null; | |
- // if (conn._cfg.mockPort) { | |
- // this.mockPort = true; | |
- // SerialPort.Binding = MockBinding; | |
- // let portPath = 'FAKE_PORT'; | |
- // MockBinding.createPort(portPath, { echo: false, record: true }); | |
- // sp = new SerialPort(portPath, { autoOpen: false }); | |
- // } | |
- // else { | |
- // this.mockPort = false; | |
- // sp = new SerialPort(conn._cfg.rs485Port, conn._cfg.portSettings); | |
- // } | |
- | |
- // conn._port = sp; | |
- // sp.open(function(err) { | |
- // if (err) { | |
- // conn.resetConnTimer(); | |
- // conn.isOpen = false; | |
- // logger.error(`Error opening port: ${err.message}. ${conn._cfg.inactivityRetry > 0 ? `Retry in ${conn._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${conn._cfg.inactivityRetry}`}`); | |
- // } | |
- // else | |
- // logger.info(`Serial port: ${ this.path } request to open succeeded without error`); | |
- // }); | |
- // sp.on('open', function() { | |
- // if (timeOut === 'retry_timeout' || timeOut === 'timeout') | |
- // logger.error('Serial port %s recovering from lost connection', conn._cfg.rs485Port); | |
- // else | |
- // logger.info(`Serial port: ${ this.path } opened`); | |
- // conn.isOpen = true; | |
- // }); | |
- // // RKS: 06-16-20 -- Unsure why we are using a stream event here. The data | |
- // // is being sent via the data event and I can find no reference to the readable event. | |
- // sp.on('data', function (data) { | |
- // if (!this.mockPort) { | |
- // if (!conn.isPaused) conn.emitter.emit('packetread', data); | |
- // } | |
- // conn.resetConnTimer(); | |
- // }); | |
- // //sp.on('readable', function () { | |
- // // if (!this.mockPort) { | |
- // // // If we are paused just read the port and do nothing with it. | |
- // // if (conn.isPaused) | |
- // // sp.read(); | |
- // // else | |
- // // conn.emitter.emit('packetread', sp.read()); | |
- // // conn.resetConnTimer(); | |
- // // } | |
- // //}); | |
- | |
- // } | |
- // if (typeof (conn.buffer) === 'undefined') { | |
- // conn.buffer = new SendRecieveBuffer(); | |
- // conn.emitter.on('packetread', function(pkt) { conn.buffer.pushIn(pkt); }); | |
- // conn.emitter.on('messagewrite', function(msg) { conn.buffer.pushOut(msg); }); | |
- // } | |
- // conn.resetConnTimer('retry_timeout'); | |
- // conn._port.on('error', function(err) { | |
- // logger.error(`Error opening port: ${err.message}. ${conn._cfg.inactivityRetry > 0 ? `Retry in ${conn._cfg.inactivityRetry} seconds` : `Never retrying; inactivityRetry set to ${conn._cfg.inactivityRetry}`}`); | |
- // conn.resetConnTimer(); | |
- // conn.isOpen = false; | |
- // }); | |
- //} | |
- public close() { | |
+ public closeAsync() { | |
+ try { | |
if (conn.connTimer) clearTimeout(conn.connTimer); | |
if (typeof (conn._port) !== 'undefined' && conn._cfg.netConnect) { | |
if (typeof (conn._port.destroy) !== 'function') | |
@@ -240,7 +161,8 @@ export class Connection { | |
else | |
conn._port.destroy(); | |
} | |
- conn.buffer.close(); | |
+ if (typeof conn.buffer !== 'undefined') conn.buffer.close(); | |
+ } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); } | |
} | |
public drain(cb: Function) { | |
if (typeof (conn._port.drain) === 'function') | |
@@ -258,9 +180,10 @@ export class Connection { | |
conn._port.write(bytes, cb); | |
} | |
public async stopAsync() { | |
- Promise.resolve() | |
- .then(function() { conn.close(); }) | |
- .then(function() { console.log('closed connection'); }); | |
+ try { | |
+ await conn.closeAsync(); | |
+ logger.info(`Closed serial communications connection.`); | |
+ } catch (err) { logger.error(`Error closing comms connection: ${err.message}`); } | |
} | |
public init() { | |
conn._cfg = config.getSection('controller.comms', { | |
@@ -272,7 +195,7 @@ export class Connection { | |
netPort: 9801, | |
inactivityRetry: 10 | |
}); | |
- conn.openAsync(); | |
+ if (conn._cfg.enabled) conn.openAsync(); | |
config.emitter.on('reloaded', () => { | |
console.log('Config reloaded'); | |
this.reloadConfig(config.getSection('controller.comms', { | |
@@ -297,9 +220,9 @@ export class Connection { | |
inactivityRetry: 10 | |
}, cfg); | |
if (JSON.stringify(c) !== JSON.stringify(this._cfg)) { | |
- this.close(); | |
+ this.closeAsync(); | |
this._cfg = c; | |
- this.openAsync(); | |
+ if (this._cfg.enabled) this.openAsync(); | |
} | |
} | |
public queueSendMessage(msg: Outbound) { conn.emitter.emit('messagewrite', msg); } | |
@@ -328,7 +251,8 @@ export class SendRecieveBuffer { | |
private _waitingPacket: Outbound; | |
private _msg: Inbound; | |
public pushIn(pkt) { | |
- conn.buffer._inBuffer.push.apply(conn.buffer._inBuffer, pkt.toJSON().data); setTimeout(() => { this.processPackets(); }, 0); } | |
+ conn.buffer._inBuffer.push.apply(conn.buffer._inBuffer, pkt.toJSON().data); setTimeout(() => { this.processPackets(); }, 0); | |
+ } | |
public pushOut(msg) { conn.buffer._outBuffer.push(msg); setTimeout(() => { this.processPackets(); }, 0); } | |
public clear() { conn.buffer._inBuffer.length = 0; conn.buffer._outBuffer.length = 0; } | |
public close() { clearTimeout(conn.buffer.procTimer); conn.buffer.clear(); this._msg = undefined; } | |
@@ -372,18 +296,32 @@ export class SendRecieveBuffer { | |
return false; | |
} | |
protected processOutbound() { | |
- if (conn.isOpen && conn.isRTS) { | |
+ let msg: Outbound; | |
if (!conn.buffer.processWaitPacket() && conn.buffer._outBuffer.length > 0) { | |
+ if (conn.isOpen) { | |
+ if (conn.isRTS) { | |
+ msg = conn.buffer._outBuffer.shift(); | |
+ if (typeof msg === 'undefined' || !msg) return; | |
// If the serial port is busy we don't want to process any outbound. However, this used to | |
// not process the outbound even when the incoming bytes didn't mean anything. Now we only delay | |
// the outbound when we actually have a message signatures to process. | |
- var msg: Outbound = conn.buffer._outBuffer.shift(); | |
- if (typeof msg === 'undefined' || !msg) return; | |
conn.buffer.writeMessage(msg); | |
} | |
} | |
+ else { | |
+ // port is closed, reject message | |
+ msg = conn.buffer._outBuffer.shift(); | |
+ msg.failed = true; | |
+ logger.warn(`Comms port is not open. Message aborted: ${msg.toShortPacket()}`); | |
+ // This is a hard fail. We don't have any more tries left and the message didn't | |
+ // make it onto the wire. | |
+ let error = new OutboundMessageError(msg, `Comms port is not open. Message aborted: ${msg.toShortPacket()}`); | |
+ if (typeof msg.onComplete === 'function') msg.onComplete(error, undefined); | |
+ conn.buffer._waitingPacket = null; | |
+ } | |
+ } | |
// RG: added the last `|| typeof msg !== 'undef'` because virtual chem controller only sends a single packet | |
- // but this condition would be eval'd before the callback of conn.write was calles and the outbound packet | |
+ // but this condition would be eval'd before the callback of conn.write was calls and the outbound packet | |
// would be sitting idle for eternity. | |
if (conn.buffer._outBuffer.length > 0 || typeof conn.buffer._waitingPacket !== 'undefined' || conn.buffer._waitingPacket || typeof msg !== 'undefined') { | |
// Come back later as we still have items to send. | |
@@ -414,7 +352,8 @@ export class SendRecieveBuffer { | |
// we have an RTS semaphore and a waiting response might make it go here. | |
msg.failed = true; | |
conn.buffer._waitingPacket = null; | |
- logger.warn(`Message aborted after ${ msg.tries } attempt(s): ${ msg.toShortPacket() }`); | |
+ if (typeof msg.onAbort === 'function') msg.onAbort(); | |
+ else logger.warn(`Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()}`); | |
let err = new OutboundMessageError(msg, `Message aborted after ${msg.tries} attempt(s): ${msg.toShortPacket()}`); | |
if (typeof msg.onComplete === 'function') msg.onComplete(err, undefined); | |
if (msg.requiresResponse) { | |
@@ -471,7 +410,6 @@ export class SendRecieveBuffer { | |
} | |
private clearResponses(msgIn: Inbound) { | |
if (conn.buffer._outBuffer.length === 0 && typeof (conn.buffer._waitingPacket) !== 'object' && conn.buffer._waitingPacket) return; | |
- | |
var callback; | |
let msgOut = conn.buffer._waitingPacket; | |
if (typeof (conn.buffer._waitingPacket) !== 'undefined' && conn.buffer._waitingPacket) { | |
@@ -503,12 +441,12 @@ export class SendRecieveBuffer { | |
if (typeof out === 'undefined') continue; | |
let resp = out.response; | |
if (out.requiresResponse) { | |
- if (resp instanceof Response && resp.isResponse(msgIn, out)) { | |
+ if (resp instanceof Response && resp.isResponse(msgIn, out) && (typeof out.scope === 'undefined' || out.scope === msgOut.scope)) { | |
resp.message = msgIn; | |
if (typeof (resp.callback) === 'function' && resp.callback) callback = resp.callback; | |
conn.buffer._outBuffer.splice(i, 1); | |
} | |
- else if (typeof resp === 'function' && resp(msgIn, out)) { | |
+ else if (typeof resp === 'function' && resp(msgIn, out) && (typeof out.scope === 'undefined' || out.scope === msgOut.scope)) { | |
if (typeof out.onResponseProcessed !== 'undefined') callback = out.onResponseProcessed; | |
conn.buffer._outBuffer.splice(i, 1); | |
} | |
@@ -526,11 +464,13 @@ export class SendRecieveBuffer { | |
conn.buffer.counter.collisions += msg.collisions; | |
if (msg.isValid) { | |
conn.buffer.counter.success++; | |
+ conn.buffer.counter.updatefailureRate(); | |
msg.process(); | |
conn.buffer.clearResponses(msg); | |
} | |
else { | |
conn.buffer.counter.failed++; | |
+ conn.buffer.counter.updatefailureRate(); | |
console.log('RS485 Stats:' + JSON.stringify(conn.buffer.counter)); | |
ndx = this.rewindFailedMessage(msg, ndx); | |
} | |
@@ -582,11 +522,16 @@ export class Counter { | |
this.failed = 0; | |
this.bytesSent = 0; | |
this.collisions = 0; | |
+ this.failureRate = '0.00%'; | |
} | |
public bytesReceived: number; | |
public success: number; | |
public failed: number; | |
public bytesSent: number; | |
public collisions: number; | |
+ public failureRate: string; | |
+ public updatefailureRate():void { | |
+ conn.buffer.counter.failureRate = `${(conn.buffer.counter.failed / (conn.buffer.counter.failed + conn.buffer.counter.success) * 100).toFixed(2)}%`; | |
+ } | |
} | |
export var conn: Connection = new Connection(); | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment