-
-
Save psych0der/a54e096b96bcaa2d0cdbb736d180c73f to your computer and use it in GitHub Desktop.
Network promise chain resolver in Node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Creates and resolves chain of promises of network requests. Chain resolver can be | |
* used to resolve composite collection of network requests arranged in parallel or synchronous | |
* sequence. Provides ahead-of-time response resolution to construct pipelines | |
* | |
* Usage: | |
* let ChainResolver = require('./utils/ChainResolver')(G); | |
* let pipelineChain = [{ | |
* 'id': 'bookCMSValidate', | |
* 'method': 'GET', | |
* 'service': 'cmsProcessor', | |
* 'path': '/api/v1/process/books/bookId/validate/', | |
* 'json': true | |
* }, { | |
* 'id': 'bookUserValidate', | |
* 'method': 'GET', | |
* 'service': 'userService', | |
* 'path': '/books/bookId/validate-for-publish/', | |
* 'json': true | |
* }]; | |
* | |
* let topology = { | |
* 'requestFormation': { | |
* 'pipeline': pipelineChain | |
* }, | |
* 'responseFormation': { | |
* 'successScheme': 'all', | |
* 'responsePayload': { | |
* 'cmsResponseBody': '$$bookCMSValidate.body', | |
* 'userResponseBody': '$$bookUserValidate.body' | |
* } | |
* } | |
* } | |
* return ChainResolver(topology, req) | |
* .then(data => { | |
* let cmsResponseBody = data.payload.cmsResponseBody; | |
* let userResponseBody = data.payload.userResponseBody; | |
* let resBody = { | |
* "success": "success", | |
* payload: { | |
* "is_valid": cmsResponseBody.data.is_valid && userResponseBody.data.is_valid, | |
* "errors": Object.assign({}, { | |
* 'bookContent': cmsResponseBody.data.error | |
* }, userResponseBody.data.error) | |
* } | |
* }; | |
* return Promise.resolve(resBody); | |
* }) | |
* .catch(err => { | |
* return Promise.reject(err); | |
* }) | |
* | |
*/ | |
let rp = require('request-promise'), | |
template = require('es6-template-strings'), | |
traverse = require('traverse'), | |
objects = require('./Objects'), | |
logger = require('./logger'); | |
/* Destructure utility methods */ | |
let {getAttr, hasAttr, setAttr} = objects; | |
let G = null, // referece of global object | |
serviceRegistry = {}; | |
let initialize = (GLOBAL_OBJECT) => { | |
G = GLOBAL_OBJECT; | |
/* This will be used to provide host settings for network request operations*/ | |
serviceRegistry = G['constants']['serviceConfigMap']; | |
} | |
/** | |
* Chain resolver function resolving chain of requests arranged in parallel or synchronous order. | |
* @param {Object} chainTopology Object descrbing topology of request chain | |
* @param {Object} request express request object | |
* @return {Promise} Promise descrbing state of request chain | |
*/ | |
let ChainResolver = (chainTopology, request) => { | |
/* create scoped response object to be used by all requests */ | |
/** | |
* chainTopology should define request structure tolopology along with response structure. | |
* Each request object should contain unique id which will be used to access request/response | |
* values by any other request in the composition. At any point any failure in any part of request chain | |
* whole chain composition will fail. | |
* Any request can substitute response values (headers, body) of previous requests in pipeline in their | |
* path & request body. Also response formation can be designed in arbitrary structure given that it should | |
* be a valid json tree and can use response values from any request in any order. | |
* `id` field in request object is an important attribute that is used to make magic substitution possible. | |
*/ | |
let responseBucket = {}; | |
let requestPayload = request.body; | |
let session = request.session; | |
let ChainPromise = Promise.resolve('init'); // Initial Promise for chain | |
/* default authorization header for requests */ | |
let defaultAuthorizationHeader = 'Basic ' + session['authToken']; | |
/* validating chainTopology */ | |
let requiredFields = ['requestFormation', 'responseFormation']; | |
let requiredFieldsPresent = true; | |
/* Check if required fields are present */ | |
requiredFields.forEach((field) => { | |
if (!(field in chainTopology && chainTopology.field !== null && chainTopology.field !== '')) { | |
requiredFieldsPresent = false; | |
} | |
}); | |
if (!requiredFieldsPresent) { | |
return Promise.reject({'status': 'failed', 'message': 'Required parameters not sent'}); | |
} | |
/* Destructure chainTopology to extract child objects */ | |
let {requestFormation, responseFormation} = chainTopology; | |
/* check if requestFormation of responseFormation is empty */ | |
if (Object.keys(requestFormation) == 0 || Object.keys(responseFormation) == 0) { | |
return Promise.reject({'status': 'failed', 'message': 'Request or response structure is an empty object'}); | |
} | |
if (!('pipeline' in requestFormation)) { | |
return Promise.reject({'status': 'failed', 'message': 'Pipeline is missing in requestFormation object'}); | |
} | |
let requiredCompositeAttributes = ['id', 'service', 'path', 'method']; | |
/** | |
* Replaces template strings starting with `$$` with values from corresponding request's repsonse object | |
* @param {object} object Object containing json tree with template strings | |
* @return {object} Object passed to the function for transformation. Though it updates the incoming object in place | |
*/ | |
let replaceTemplateAttribtues = (object) => { | |
let responsePayloadTemplate = chainTopology['responseFormation']['responsePayload']; | |
traverse(object).forEach(function (val) { | |
if (typeof val === 'string') { | |
if (val.startsWith('$$')) { | |
this.update(getAttr(responseBucket, val.replace('$$', ''))); | |
} | |
} | |
}); | |
return object | |
} | |
/** | |
* Returns a request maker function which will in turn create and return request promise based on requstObject. | |
* Request object should contain some required parameters like method, id, service etc. Other parameters like | |
* headers, querystrings have fallbacks which will be updated by parameters from request object. | |
* Timing to call the returned function is critical since, requestObject can contain args which need to be | |
* replaced from previous request's response values. So call the function only when requests it is depending on | |
* gets resolved | |
* @param {object} requestObject Object containing data to create a new requst object | |
* @return {function} returns a request maker function which when called will return request promise | |
*/ | |
let createRequestMaker = (requestObject) => { | |
/* lazily validate object attributes */ | |
let attributesPresent = true; | |
requiredCompositeAttributes.forEach((attr) => { | |
if (!(attr in requestObject)) { | |
throw new Error('Missing attr `' + attr + '` in pipeline object'); | |
} | |
}); | |
/* validate request object for service passed */ | |
if (!(requestObject.service in serviceRegistry)) { | |
throw new Error('Service `' + requestObject.service + '` is not a valid service'); | |
} | |
/* generate object in response object for this request id */ | |
responseBucket[requestObject.id] = {}; | |
/** | |
* Returns a function which will create and return promise when called. Reason behind this strateg is | |
* to make use of closures which will ensure correct context when request promise is being generated | |
*/ | |
return () => { | |
/* Creates request object using fallbacks and parameters from requestObject */ | |
/* Process path fragments to subtitute tempalte strings starting with `$$` from preious request's response values */ | |
let pathFragments = requestObject | |
.path | |
.split('/'); | |
let updatedPathFragments = pathFragments.map((fragment) => { | |
return replaceTemplateAttribtues([fragment])[0]; | |
}); | |
let updatedPath = updatedPathFragments.join('/'); | |
let requestOptions = { | |
uri: template(serviceRegistry[requestObject['service']].host + updatedPath, {userId: request.session.userId}), | |
headers: Object.assign({}, { | |
'Authorization': 'Basic ' + request.session.authToken | |
}, serviceRegistry[requestObject['service']].auth, requestObject.headers), | |
qs: Object.assign({}, requestObject.queryString), | |
timeout: requestObject.timeout || 5000, | |
method: requestObject.method, | |
body: replaceTemplateAttribtues(requestObject.payload) || {}, | |
json: requestObject.json || true, | |
resolveWithFullResponse: true | |
}; | |
/* return request promise */ | |
return rp(requestOptions).then((response) => { | |
responseBucket[requestObject.id] = Object.assign({}, response); | |
}).catch(err => { | |
logger.debug('request failed for: ' + requestObject.id); | |
return Promise.reject(err); | |
}); | |
} | |
}; | |
/** | |
* Creates a promise chain out of pipeline(collection) of request object. Pipeline is an Array | |
* which can in turn contain an object or another array. All objects inside this pipeline array | |
* are executed sequentially. Array inside this pipeline array can only contain single request objects. | |
* All the request object specified inside array of pipeline array are executed parallelly. This means | |
* that if you want to execute all the request parallelly, then wrap the request object inside and array | |
* and place this single array inside pipeline array. eg pipeline = [ [ {R1}, {R2}, {R3} ] ] | |
* Heterogenius compositions are very easy to create by mixing single request objects with array of single requests | |
* For eg. [ {R1}, {R2}, [ {R3}, {R4} ], {R5}, [ {R6}, {R7} ], {R8} ]: this pipeline will fire | |
* R1 followed by R2 which will be followed by R3 & R4 fired parallely. When both R3 and R4 have resolved then only | |
* R5 will be fired followed by R6 and R7 parallelly, followed by R8. | |
* At any instance, if any request fails then then whole pipeline aborts | |
* @param {Array} pipeline Pipeline array containing requestObject in particular sequence | |
* @return {Promise} Promise resolving when final call in pipeline has resolved. It rejects as soon | |
* as any request in the pipeline fails | |
*/ | |
let createPromiseChain = (pipeline) => { | |
/* Init promise to kickoff the pipeline process */ | |
let localChainPromise = Promise.resolve('init'); | |
/* Reducing pipeline array to compose promise chain */ | |
localChainPromise = pipeline.reduce((previous, current) => { | |
/* if item type is object */ | |
if (typeof current === 'object') { | |
/* If child of pipeline is an array, then process all the requests parallelly */ | |
if (Array.isArray(current)) { | |
/* collection of promise maker functions for parallel requests */ | |
let promiseMakerCollection = []; | |
current.forEach((item) => { | |
let requestMaker = null; | |
try { | |
requestMaker = createRequestMaker(item); | |
promiseMakerCollection.push(requestMaker) | |
} catch (e) { | |
// pass the error down the catch chain | |
throw e; | |
} | |
}); | |
return previous.then(() => { | |
/* Create promise from promise creator */ | |
let promiseCollection = promiseMakerCollection.map((promiseMaker) => { | |
return promiseMaker(); | |
}); | |
/** | |
* Return Promise that will resolve when all the constituent promise will resolved. | |
* Constituent promises will be fired parallelly | |
*/ | |
return Promise.all(promiseCollection); | |
}) | |
} else { | |
/* Normal request object to be fired sequentially */ | |
let requestMaker = null; | |
try { | |
requestMaker = createRequestMaker(current); | |
return previous.then(requestMaker); | |
} catch (e) { | |
// pass the error down the catch chain | |
throw e; | |
} | |
} | |
} | |
}, localChainPromise); | |
return localChainPromise; | |
}; | |
/* Construct chain. Throws exception when any request object in the pipeline fails */ | |
try { | |
ChainPromise = createPromiseChain(requestFormation.pipeline); | |
} catch (err) { | |
logger.exception(err); | |
return Promise.reject({'status': 'failed', 'message': err.message}); | |
} | |
/* Success callback for promise chain */ | |
return ChainPromise.then(() => { | |
/* create payload object */ | |
let responsePayload = {}; | |
/* iterate over responsePipeline object */ | |
let responsePayloadTemplate = chainTopology['responseFormation']['responsePayload']; | |
return {'status': 'success', 'payload': replaceTemplateAttribtues(responsePayloadTemplate)} | |
}).catch(err => { | |
if (err instanceof Error) { | |
logger.exception(err); | |
} else { | |
logger.exception(JSON.stringify(err)); | |
} | |
return Promise.reject({'status': 'failed', 'message': 'Unable to perform operation', 'statusCode': 503}); | |
}); | |
} | |
module.exports = (GLOBAL_OBJECT) => { | |
initialize(GLOBAL_OBJECT); | |
return ChainResolver; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment