Skip to content

Instantly share code, notes, and snippets.

@hillar
Last active February 11, 2020 08:18
Show Gist options
  • Save hillar/e1dabbb47e32da9836f7284bf3e6de93 to your computer and use it in GitHub Desktop.
Save hillar/e1dabbb47e32da9836f7284bf3e6de93 to your computer and use it in GitHub Desktop.
stream transform with internal pooling
import { Transform } from "stream";
import { inspect } from "util";
import { kStringMaxLength as STRINGMAXLENGTH } from "buffer";
import { InfluxUdp } from "./influxdb-udp.mjs";
export function wait(ms, ...msg) {
ms = Math.round(ms);
return new Promise(resolve => {
//printProgress('pause ',ms, msg)
setTimeout(() => {
//if (msg) printProgress('waited',ms, ...msg)
resolve();
}, ms);
});
}
export function printProgress(...progress) {
//console.log('')
if (process.stdout.isTTY) {
process.stdout.clearLine();
process.stdout.cursorTo(0);
process.stdout.write(JSON.stringify(progress));
} else console.debug(progress.join(" "));
}
// pool: array of strings of hostname:port pairs
// tti : amount of time in which it takes for the server to become functional
// tries: how many times to make connection attempts to unresponsive server
// fails if all servers in pool are not 'connectable' in (tti * tries)
export class Ask extends Transform {
name = "";
pool = [];
tti = 1000 * 3;
tries = 3;
counter = 0;
tooked = 0;
average = undefined;
_askState = {
running: [],
down: [],
flushing: false
};
constructor(options) {
if (!options || !options.pool || !Array.isArray(options.pool) || !options.pool.length) throw new Error("No pool. Required array of hostname:port pairs");
options.objectMode = true;
options.highWaterMark = 1;
super(options);
for (const server of options.pool) {
let [hostname, port] = server.split(":");
if (!port) port = options.defaultport;
if (hostname.trim().length && port) {
this.pool.push({ hostname, port });
} else throw new Error("Missing hostname:port pair");
}
if (typeof options.ask === "function") this._ask = options.ask;
if (options.tti) this.tti = options.tti;
if (options.tries) this.tries = options.tries;
if (options.name) this.name = options.name;
else this.name = this.constructor.name;
}
_ask(hostname, port, data, abort) {
console.error(this.name + ": _ask not implemented");
process.exit(-1);
}
get waittime() {
return this.average ? Math.round(this.average / this.pool.length) : Math.round(1000 / this.pool.length);
}
get alldown() {
return this._askState.down.filter(v => v > this.tries).length === this.pool.length;
}
get somerunning() {
for (const i in this._askState.running) {
if (this._askState.down[i] !== undefined) {
if (this._askState.running[i]) {
const state = inspect(this._askState.running[i]);
if (state === "Promise { <pending> }") return true;
}
}
}
return false;
}
sendmetrix(m) {
//console.dir(m);
}
async _transform(current, encoding, next) {
if (Object.keys(current).filter(k=>!k.startsWith('__')).length){
await this.waitforslot(current);
this.counter++;
} else this.push(current)
next();
}
async _flush(last) {
//console.dir({s:this.name,r:this._askState.running})
while (this.somerunning && !this.alldown && !this.destroyed) {
await wait(this.waittime);
}
for (const i in this._askState.running) {
delete this._askState.running[i];
}
this.push({__done__:{service:this.name,count:this.counter,average:this.average}})
return void last();
}
async waitforslot(data) {
let waited = 0;
while (!this.destroyed) { // <-----------
for (const i in this.pool) {
if (this._askState.running[i]) continue;
this._askState.running[i] = this.ask(data, i, waited);
return;
}
const waittime = this.waittime;
await wait(waittime);
waited += waittime;
} // <---------------------------------------
}
async ask(data, slot, waited) {
const start = process.hrtime.bigint();
const { hostname, port } = this.pool[slot];
const server = hostname + ":" + port;
let answer;
try {
answer = await this._ask(hostname, port, data, this);
this._askState.running[slot] = undefined;
const took = Number(process.hrtime.bigint() - start) / 1e6;
answer.foo += " " + waited + " " + hostname;
this.push(answer);
this.tooked += took;
this.average = this.tooked / this.counter;
this.sendmetrix({ waited, took, server, service: this.name, i: data._id });
} catch (error) {
/*
if (error instanceof ReferenceError){
console.error(error)
process.exit(-1)
}
*/
this.push({__failed__:{error, timestamp:new Date(), server, service:this.name}})
this._askState.running[slot] = this.ping(slot, hostname, port);
await this.waitforslot(data);
}
}
async ping(slot, hostname, port) {
let answer;
const start = process.hrtime.bigint();
const server = hostname+':'+port
while (!answer && !this.destroyed) {
try {
answer = await this._ask(hostname, port, {}, this);
this._askState.running[slot] = undefined;
this._askState.down[slot] = undefined;
const downtime = Number(process.hrtime.bigint() - start) / 1e6;
this.push({__up__:{downtime,timestamp:new Date(),server, service:this.name}})
this.sendmetrix({ downtime, server, service: this.name });
} catch (error) {
if (error instanceof ReferenceError || error instanceof SyntaxError){
console.error(error)
process.exit(-1)
}
if (!this._askState.down[slot]) this._askState.down[slot] = 0;
this._askState.down[slot]++;
if (this.alldown) this.destroy(new Error(this.name + " ALL DOWN"));
else await wait(this.tti);
}
}
this._askState.running[slot] = undefined;
this._askState.down[slot] = undefined;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment