Skip to content

Instantly share code, notes, and snippets.

@psych0der
Created August 10, 2018 11:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save psych0der/a54e096b96bcaa2d0cdbb736d180c73f to your computer and use it in GitHub Desktop.
Save psych0der/a54e096b96bcaa2d0cdbb736d180c73f to your computer and use it in GitHub Desktop.
Network promise chain resolver in Node.js
/**
* Creates and resolves chain of promises of network requests. Chain resolver can be
* used to resolve composite collection of network requests arranged in parallel or synchronous
* sequence. Provides ahead-of-time response resolution to construct pipelines
*
* Usage:
* let ChainResolver = require('./utils/ChainResolver')(G);
* let pipelineChain = [{
* 'id': 'bookCMSValidate',
* 'method': 'GET',
* 'service': 'cmsProcessor',
* 'path': '/api/v1/process/books/bookId/validate/',
* 'json': true
* }, {
* 'id': 'bookUserValidate',
* 'method': 'GET',
* 'service': 'userService',
* 'path': '/books/bookId/validate-for-publish/',
* 'json': true
* }];
*
* let topology = {
* 'requestFormation': {
* 'pipeline': pipelineChain
* },
* 'responseFormation': {
* 'successScheme': 'all',
* 'responsePayload': {
* 'cmsResponseBody': '$$bookCMSValidate.body',
* 'userResponseBody': '$$bookUserValidate.body'
* }
* }
* }
* return ChainResolver(topology, req)
* .then(data => {
* let cmsResponseBody = data.payload.cmsResponseBody;
* let userResponseBody = data.payload.userResponseBody;
* let resBody = {
* "success": "success",
* payload: {
* "is_valid": cmsResponseBody.data.is_valid && userResponseBody.data.is_valid,
* "errors": Object.assign({}, {
* 'bookContent': cmsResponseBody.data.error
* }, userResponseBody.data.error)
* }
* };
* return Promise.resolve(resBody);
* })
* .catch(err => {
* return Promise.reject(err);
* })
*
*/
let rp = require('request-promise'),
template = require('es6-template-strings'),
traverse = require('traverse'),
objects = require('./Objects'),
logger = require('./logger');
/* Destructure utility methods */
let {getAttr, hasAttr, setAttr} = objects;
let G = null, // referece of global object
serviceRegistry = {};
let initialize = (GLOBAL_OBJECT) => {
G = GLOBAL_OBJECT;
/* This will be used to provide host settings for network request operations*/
serviceRegistry = G['constants']['serviceConfigMap'];
}
/**
* Chain resolver function resolving chain of requests arranged in parallel or synchronous order.
* @param {Object} chainTopology Object descrbing topology of request chain
* @param {Object} request express request object
* @return {Promise} Promise descrbing state of request chain
*/
let ChainResolver = (chainTopology, request) => {
/* create scoped response object to be used by all requests */
/**
* chainTopology should define request structure tolopology along with response structure.
* Each request object should contain unique id which will be used to access request/response
* values by any other request in the composition. At any point any failure in any part of request chain
* whole chain composition will fail.
* Any request can substitute response values (headers, body) of previous requests in pipeline in their
* path & request body. Also response formation can be designed in arbitrary structure given that it should
* be a valid json tree and can use response values from any request in any order.
* `id` field in request object is an important attribute that is used to make magic substitution possible.
*/
let responseBucket = {};
let requestPayload = request.body;
let session = request.session;
let ChainPromise = Promise.resolve('init'); // Initial Promise for chain
/* default authorization header for requests */
let defaultAuthorizationHeader = 'Basic ' + session['authToken'];
/* validating chainTopology */
let requiredFields = ['requestFormation', 'responseFormation'];
let requiredFieldsPresent = true;
/* Check if required fields are present */
requiredFields.forEach((field) => {
if (!(field in chainTopology && chainTopology.field !== null && chainTopology.field !== '')) {
requiredFieldsPresent = false;
}
});
if (!requiredFieldsPresent) {
return Promise.reject({'status': 'failed', 'message': 'Required parameters not sent'});
}
/* Destructure chainTopology to extract child objects */
let {requestFormation, responseFormation} = chainTopology;
/* check if requestFormation of responseFormation is empty */
if (Object.keys(requestFormation) == 0 || Object.keys(responseFormation) == 0) {
return Promise.reject({'status': 'failed', 'message': 'Request or response structure is an empty object'});
}
if (!('pipeline' in requestFormation)) {
return Promise.reject({'status': 'failed', 'message': 'Pipeline is missing in requestFormation object'});
}
let requiredCompositeAttributes = ['id', 'service', 'path', 'method'];
/**
* Replaces template strings starting with `$$` with values from corresponding request's repsonse object
* @param {object} object Object containing json tree with template strings
* @return {object} Object passed to the function for transformation. Though it updates the incoming object in place
*/
let replaceTemplateAttribtues = (object) => {
let responsePayloadTemplate = chainTopology['responseFormation']['responsePayload'];
traverse(object).forEach(function (val) {
if (typeof val === 'string') {
if (val.startsWith('$$')) {
this.update(getAttr(responseBucket, val.replace('$$', '')));
}
}
});
return object
}
/**
* Returns a request maker function which will in turn create and return request promise based on requstObject.
* Request object should contain some required parameters like method, id, service etc. Other parameters like
* headers, querystrings have fallbacks which will be updated by parameters from request object.
* Timing to call the returned function is critical since, requestObject can contain args which need to be
* replaced from previous request's response values. So call the function only when requests it is depending on
* gets resolved
* @param {object} requestObject Object containing data to create a new requst object
* @return {function} returns a request maker function which when called will return request promise
*/
let createRequestMaker = (requestObject) => {
/* lazily validate object attributes */
let attributesPresent = true;
requiredCompositeAttributes.forEach((attr) => {
if (!(attr in requestObject)) {
throw new Error('Missing attr `' + attr + '` in pipeline object');
}
});
/* validate request object for service passed */
if (!(requestObject.service in serviceRegistry)) {
throw new Error('Service `' + requestObject.service + '` is not a valid service');
}
/* generate object in response object for this request id */
responseBucket[requestObject.id] = {};
/**
* Returns a function which will create and return promise when called. Reason behind this strateg is
* to make use of closures which will ensure correct context when request promise is being generated
*/
return () => {
/* Creates request object using fallbacks and parameters from requestObject */
/* Process path fragments to subtitute tempalte strings starting with `$$` from preious request's response values */
let pathFragments = requestObject
.path
.split('/');
let updatedPathFragments = pathFragments.map((fragment) => {
return replaceTemplateAttribtues([fragment])[0];
});
let updatedPath = updatedPathFragments.join('/');
let requestOptions = {
uri: template(serviceRegistry[requestObject['service']].host + updatedPath, {userId: request.session.userId}),
headers: Object.assign({}, {
'Authorization': 'Basic ' + request.session.authToken
}, serviceRegistry[requestObject['service']].auth, requestObject.headers),
qs: Object.assign({}, requestObject.queryString),
timeout: requestObject.timeout || 5000,
method: requestObject.method,
body: replaceTemplateAttribtues(requestObject.payload) || {},
json: requestObject.json || true,
resolveWithFullResponse: true
};
/* return request promise */
return rp(requestOptions).then((response) => {
responseBucket[requestObject.id] = Object.assign({}, response);
}).catch(err => {
logger.debug('request failed for: ' + requestObject.id);
return Promise.reject(err);
});
}
};
/**
* Creates a promise chain out of pipeline(collection) of request object. Pipeline is an Array
* which can in turn contain an object or another array. All objects inside this pipeline array
* are executed sequentially. Array inside this pipeline array can only contain single request objects.
* All the request object specified inside array of pipeline array are executed parallelly. This means
* that if you want to execute all the request parallelly, then wrap the request object inside and array
* and place this single array inside pipeline array. eg pipeline = [ [ {R1}, {R2}, {R3} ] ]
* Heterogenius compositions are very easy to create by mixing single request objects with array of single requests
* For eg. [ {R1}, {R2}, [ {R3}, {R4} ], {R5}, [ {R6}, {R7} ], {R8} ]: this pipeline will fire
* R1 followed by R2 which will be followed by R3 & R4 fired parallely. When both R3 and R4 have resolved then only
* R5 will be fired followed by R6 and R7 parallelly, followed by R8.
* At any instance, if any request fails then then whole pipeline aborts
* @param {Array} pipeline Pipeline array containing requestObject in particular sequence
* @return {Promise} Promise resolving when final call in pipeline has resolved. It rejects as soon
* as any request in the pipeline fails
*/
let createPromiseChain = (pipeline) => {
/* Init promise to kickoff the pipeline process */
let localChainPromise = Promise.resolve('init');
/* Reducing pipeline array to compose promise chain */
localChainPromise = pipeline.reduce((previous, current) => {
/* if item type is object */
if (typeof current === 'object') {
/* If child of pipeline is an array, then process all the requests parallelly */
if (Array.isArray(current)) {
/* collection of promise maker functions for parallel requests */
let promiseMakerCollection = [];
current.forEach((item) => {
let requestMaker = null;
try {
requestMaker = createRequestMaker(item);
promiseMakerCollection.push(requestMaker)
} catch (e) {
// pass the error down the catch chain
throw e;
}
});
return previous.then(() => {
/* Create promise from promise creator */
let promiseCollection = promiseMakerCollection.map((promiseMaker) => {
return promiseMaker();
});
/**
* Return Promise that will resolve when all the constituent promise will resolved.
* Constituent promises will be fired parallelly
*/
return Promise.all(promiseCollection);
})
} else {
/* Normal request object to be fired sequentially */
let requestMaker = null;
try {
requestMaker = createRequestMaker(current);
return previous.then(requestMaker);
} catch (e) {
// pass the error down the catch chain
throw e;
}
}
}
}, localChainPromise);
return localChainPromise;
};
/* Construct chain. Throws exception when any request object in the pipeline fails */
try {
ChainPromise = createPromiseChain(requestFormation.pipeline);
} catch (err) {
logger.exception(err);
return Promise.reject({'status': 'failed', 'message': err.message});
}
/* Success callback for promise chain */
return ChainPromise.then(() => {
/* create payload object */
let responsePayload = {};
/* iterate over responsePipeline object */
let responsePayloadTemplate = chainTopology['responseFormation']['responsePayload'];
return {'status': 'success', 'payload': replaceTemplateAttribtues(responsePayloadTemplate)}
}).catch(err => {
if (err instanceof Error) {
logger.exception(err);
} else {
logger.exception(JSON.stringify(err));
}
return Promise.reject({'status': 'failed', 'message': 'Unable to perform operation', 'statusCode': 503});
});
}
module.exports = (GLOBAL_OBJECT) => {
initialize(GLOBAL_OBJECT);
return ChainResolver;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment