Skip to content

Instantly share code, notes, and snippets.

@meredian
Created May 28, 2019 10:16
Show Gist options
  • Save meredian/4fa615654f6c16cd456e33af98cd336a to your computer and use it in GitHub Desktop.
Save meredian/4fa615654f6c16cd456e33af98cd336a to your computer and use it in GitHub Desktop.
Simple CLI tool with no dependencies. It reads given file by lines, group them in batches, and send batches to list of given HTTP services.
#!/usr/bin/env node
const fs = require('fs');
const url = require('url');
const path = require('path');
const http = require('http');
const readline = require('readline');
const EventEmitter = require('events');
const p = console.log;
const [candiateFilePath, ...hosts] = process.argv.slice(2);
if (!candiateFilePath || candiateFilePath === '--help' || hosts.length === 0) {
p('Helper tool for sending data from given candidate json-per-line file ');
p('to a given HTTP service. ');
p('It may shovel single file into multiple HTTP-services simultaneously ');
p(' ');
p('Usage: ');
p(' ./data-shoveler.js file.json 127.0.0.1:60000 127.0.0.2:60000 ');
p(' ');
p('Env variables: ');
p(' BATCH_SIZE=50 - set size of batch, send per request. ');
p(' NUMBER=0 - number of data lines to send in total; 0 = "all" ');
process.exit(1);
}
const number = parseInt(process.env.NUMBER, 10) || 0;
const batchSize = parseInt(process.env.BATCH_SIZE, 10) || 50;
const resolvedPath = path.resolve(candiateFilePath);
p(`Starting shoveling data, full ahead!`);
p(` From file: ${resolvedPath}`);
p(` To hosts: ${hosts.join(', ')}`);
p(` Lines to shovel: ${number || 'all' }`);
p(` Lines per batch: ${batchSize}`);
p(``)
const startTime = Date.now();
const preparedBatches = [];
let nextBatch = [];
let batchedLines = 0;
let confirmedLines = 0;
let isFileClosed;
const fileStream = fs.createReadStream(candiateFilePath);
const rl = readline.createInterface(fileStream);
const errorListener = (err) => {
p("Error happened!");
p(err.toString());
};
// Event-emitter wrapper for throwing data into the outher space - so we
// can abstract a single server communication with an object.
class HttpShoveler extends EventEmitter {
constructor(host) {
super();
this.url = 'http://' + host;
p(`HttpShoveler created with url ${this.url}`);
this.parsedUrl = url.parse(this.url);
}
shovel(chunk) {
this.__chunk = chunk;
p(`Shoveling... ${chunk.length} lines to ${this.url}`);
const { hostname, port } = this.parsedUrl;
const httpRequest =
sendRequest(hostname, port, '/', 'POST', chunk.join('\n'));
httpRequest.on('complete', res => {
const { data, statusCode } = res;
p(`Shoveled: ${statusCode} ${chunk.length} lines to ${this.url}`);
// p('Received data:');
// p(data);
// p();
this.__chunk = null;
this.emit('done', chunk.length);
});
}
isFree() {
return !this.__chunk;
}
}
const shovelers = hosts.map(host => {
return new HttpShoveler(host)
.on('done', function(shoveledCount) {
confirmedLines += shoveledCount;
rl.emit('batchConsumed');
if (preparedBatches.length > 0) {
const batch = preparedBatches.shift();
this.shovel(batch)
} else {
rl.resume();
}
})
.on('error', errorListener);
});
const findFreeShoveler = () => shovelers.find(s => s.isFree());
const printStats = () => {
const unsendBatchedCount =
preparedBatches.reduce((acc, batch) => acc + batch.length, 0);
const timeElapsed = (Date.now() - startTime) / 1000
const avgSpeed = Math.round(confirmedLines / timeElapsed);
p(`Total batched lines: ${batchedLines}`);
p(`Unsend batched lines: ${unsendBatchedCount}`);
p(`Confirmes send lines: ${confirmedLines}`);
p(`Total time elapsed: ${timeElapsed} sec.`);
p(`Avg. speed: ${avgSpeed} lines per sec.`);
};
rl.on('line', line => {
const shouldShovelMore = !number || batchedLines < number;
if (shouldShovelMore) {
nextBatch.push(line);
if (nextBatch.length >= batchSize) {
preparedBatches.push(nextBatch);
nextBatch = [];
rl.emit('batchPrepared');
}
batchedLines += 1;
} else {
if (nextBatch.length > 0) {
preparedBatches.push(nextBatch);
rl.emit('batchPrepared');
}
}
});
rl.on('batchPrepared', () => {
const shoveler = findFreeShoveler();
if (shoveler) {
if (preparedBatches.length > 0) {
const batch = preparedBatches.shift();
shoveler.shovel(batch)
}
} else {
rl.pause();
}
});
rl.on('batchConsumed', () => {
printStats()
const isExpectedNumberReached = number && confirmedLines >= number;
const isFileFullyConsumed = isFileClosed && confirmedLines >= batchedLines;
if (isExpectedNumberReached || isFileFullyConsumed) {
p(`Ok, finally ${confirmedLines} lines consumed, we're there!`);
process.exit(0);
}
});
// Simple error handling
rl.on('error', errorListener);
fileStream.on('error', errorListener);
// Raising flag if file is ended
fileStream.on('end', () => {
isFileClosed = true;
});
// Helper method for doing HTTP requests
const sendRequest = (hostname, port, path, method, data) => {
const requestOptions = {
host: hostname,
port,
path,
method,
headers: data ? {
'Content-Type': 'text/plain; encoding=utf8',
'Content-Length': Buffer.byteLength(data),
} : {},
};
const req = http.request(requestOptions, (res) => {
res.setEncoding('utf8');
const chunks = []
res.on('data', (chunk) => {
chunks.push(chunk.toString());
});
res.on('end', () => {
res.data = chunks.join('');
req.emit('complete', res);
});
});
req.write(data);
req.end();
return req;
}
process.on('uncaughtException', (err) => {
console.error('uncaught exception:', err.stack || err);
process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment