Skip to content

Instantly share code, notes, and snippets.

@fed135
Created July 18, 2019 19:13
Show Gist options
  • Save fed135/d078448650c71951f53580f16255e3d4 to your computer and use it in GitHub Desktop.
Save fed135/d078448650c71951f53580f16255e3d4 to your computer and use it in GitHub Desktop.
Forked HTTP
/**
* Multi-process request
*/
/* Requires ------------------------------------------------------------------*/
const crypto = require('crypto');
const path = require('path');
const fork = require('child_process').fork;
/* Local variables -----------------------------------------------------------*/
const poolSize = process.env.NODE_NET_POOL_SIZE || 10;
const queue = new Map();
const workerPath = path.join(__dirname, './worker');
const pool = Array.from(new Array(poolSize))
.map(() => fork(workerPath, { env: process.env }));
let inc = 0;
/* Methods -------------------------------------------------------------------*/
/**
* Parses options, then creates a handle for an async net call
* @see https://nodejs.org/api/http.html#http_http_request_options_callback
* @param {object} opts The options for the network call
* @returns {Promise}
*/
function send(opts) {
opts = opts || {};
// Check for bad options
if (opts.body !== undefined) {
if (opts.json === true) {
if (!Buffer.isBuffer(opts.body)) {
opts.body = JSON.stringify(opts.body);
}
}
else {
if (!Buffer.isBuffer(opts.body) && typeof opts.body !== 'string') {
throw new Error('Invalid option: body needs to be either a Buffer or String, unless json:true is provided.');
}
}
}
if (opts.agent !== undefined) {
throw new Error('Invalid option: agent cannot be changed.');
}
if (opts.createConnection !== undefined) {
throw new Error('Invalid option: createConnection cannot be changed.');
}
if (opts.protocol !== undefined) {
if (opts.protocol !== 'http:' && opts.protocol !== 'https:') {
throw new Error('Invalid option: protocol needs to be either "http:" or "https:".');
}
}
// Create uuid
const uuid = crypto.randomBytes(8).toString('hex');
const promise = new Promise((resolve, reject) => {
queue.set(uuid, {
resolve,
reject,
json: opts.json,
});
});
opts.uuid = uuid;
// Send to worker
pool[inc].send(opts);
// Load balance
if (inc < poolSize -1) inc++;
else inc = 0;
return promise;
}
/**
* Handles completed requests comming back from workers
* @private
* @param {object} query The query object
*/
function resolve(query) {
const handler = queue.get(query.uuid);
if (query.error === undefined) {
if (handler.json === true) query.body = JSON.parse(query.body);
handler.resolve(query);
}
else {
handler.reject(query.error);
}
queue.delete(query.uuid);
}
/* Init ----------------------------------------------------------------------*/
pool.forEach(f => f.on('message', resolve));
/* Exports -------------------------------------------------------------------*/
module.exports = { send };
/**
* Request http worker
*/
/* Requires ------------------------------------------------------------------*/
const http = require('http');
const https = require('https');
const url = require('url');
/* Local variables -----------------------------------------------------------*/
const keepAliveOptions = {
keepAlive: true,
keepAliveMsecs: process.env.NODE_KEEP_ALIVE_MSEC || 5000,
};
const protocols = {
'http:': {
client: http,
agent: new http.Agent(keepAliveOptions),
},
'https:': {
client: https,
agent: new https.Agent(keepAliveOptions),
},
};
/* Methods -------------------------------------------------------------------*/
/**
* Worker method to create the http/https request
* @internal
* @param {object} opts The options for the request
* @returns {Promise}
*/
function send(opts) {
return Promise.resolve().then(() => {
if (opts.url) opts = Object.assign(opts, url.parse(opts.url));
opts.agent = protocols[opts.protocol || 'http:'].agent;
const errorHandler = handleError.bind(null, opts.uuid);
const req = protocols[opts.protocol || 'http:'].client.request(opts, handleResponse.bind(null, opts.uuid));
if (opts.body) req.write(opts.body);
req.on('error', errorHandler);
req.setTimeout(5000, errorHandler)
req.end();
}).catch(handleError.bind(null, opts.uuid));
}
/**
* Worker method to handle request responses
* @private
* @param {string} uuid The unique identifier for the request
* @param {object} res The response object
* @returns {Promise}
*/
function handleResponse(uuid, res) {
return Promise.resolve().then(() => {
const response = {
uuid,
headers: res.headers,
statusCode: res.statusCode,
body: [],
};
res.on('data', chunk => response.body.push(chunk));
res.on('end', () => {
response.body = response.body.join('');
process.send(response);
});
}).catch(handleError.bind(null, uuid));
}
/**
* Worker method to handle request errors
* @private
* @param {string} uuid The unique identifier for the request
* @param {Error} error The error
*/
function handleError(uuid, error) {
process.send({
uuid,
error,
});
}
/* Init ----------------------------------------------------------------------*/
process.title = 'http-worker';
process.on('message', send);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment