Created
July 6, 2017 10:33
-
-
Save BingjieGao/2371f4d47a75839342fa2b3745da731d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = (function() { | |
"use strict"; | |
const _ = require("lodash"); | |
const safeBands = require("./safe-age"); | |
const Promise = require("bluebird"); | |
const split = require("split"); | |
let tdxApi; | |
/** | |
* | |
* @param {*} serviceRatioId - datasetId refers to serviceRatio | |
* @param {*} populationId - datasetId refers to poplets data, default one is S1gCLZJxNe | |
* @param {*} networkId - datasetId refers to network that contains a set of opened services | |
*/ | |
const areaServiceDatabot = function(input, output, context, deststream) { | |
/** | |
* @param {*} areaServiceDataset - the unique areaServiceId points | |
* to a document in the area service lookup | |
* schema is | |
* { | |
* areaServiceId: "string", | |
* populationId: "string" | |
* networkId: "string", | |
* behaviourFunctionId: "string", | |
* } | |
* @param {*} araeServiceId - UID | |
* @param {*} ccgCode - within this ccg area | |
*/ | |
const areaServiceDataset = context.packageParams.areaServiceDataset; | |
const areaServiceId = input.areaServiceId; | |
// const ccgCode = input.ccgCode; | |
tdxApi = context.tdxApi; | |
const mappingId = input.mappingId; | |
let populationId; | |
let networkId; | |
let serviceRatioId; | |
// a set of serviceArray from networkId | |
let serviceArray = null; | |
let areaArray = null; | |
let ratioData = null; | |
return tdxApi.getDistinctAsync(mappingId, "parentId", {parentType: "CCG15CD", childType: "LSOA11CD", mappingType: "ons-mapping"}, null, null) | |
.then((response) => { | |
console.log(`retrive ccg length is ${response.data.length}`); | |
return Promise.each(response.data, (ccgCode) => { | |
// requesting data according to each senario | |
return tdxApi.getDatasetDataAsync(areaServiceDataset, {areaServiceId}, null, null) | |
.then((result) => { | |
if (!result || result.data.length !== 1) { | |
output.debug(`NO result requesting from areaServiceDataset with input UUID ${areaServiceId}`); | |
} else { | |
populationId = result.data[0].populationId; | |
networkId = result.data[0].networkId; | |
serviceRatioId = input.behaviourId; | |
const projection = { | |
serviceId: 1, | |
}; | |
return tdxApi.getDatasetDataAsync(networkId, null, projection, {limit: 0}); | |
} | |
}) | |
.then((response) => { | |
// get all lsoa areas within given ccgCode | |
if (!response || response.data.length === 0) { | |
output.debug("NO result from network datset"); | |
} else { | |
// a set of serviceIds | |
serviceArray = _.map(response.data, "serviceId"); | |
// boundary look up dataset from tdx | |
// const patientsMapping = context.packageParams.patientsMapping; | |
// matching pipeline, all lsoa areas $in this ccgCode | |
// const pipeline = [ | |
// { | |
// $match: { | |
// parentId: ccgCode, | |
// }, | |
// }, | |
// { | |
// $group: { | |
// _id: null, | |
// childArray: { | |
// $push: "$childId", | |
// }, | |
// }, | |
// }, | |
// ]; | |
const patientsFilter = { | |
parentId: ccgCode, | |
mappingType: "patients-mapping", | |
childType: "LSOA11CD", | |
}; | |
const patientsProjection = { | |
childId: 1, | |
}; | |
// retriving all childIds as a an array with parentId matching given ccg code | |
return tdxApi.getDatasetDataAsync(mappingId, patientsFilter, patientsProjection, {limit: 0}); | |
// a set of LSOA areas | |
} | |
}) | |
.then((response) => { | |
if (!response || response.data.length === 0) { | |
output.debug("NO valid matching data from boundary lookup"); | |
} else { | |
// lsoa areas filtered by given ccg code | |
// from gp view, the total poplets registered in gps within this ccg area | |
// gp points in this ccg code have patients from | |
// lsoas $in areaArray | |
// response is {data: [{childId:}]} | |
areaArray = _.map(response.data, "childId"); | |
// resuest all serviceRatio data within this areaArray | |
const filter = { | |
// this need to be fixed, the new field should be areaId instead of _ | |
areaId: { | |
$in: areaArray, | |
}, | |
ageBand: { | |
$in: safeBands, | |
}, | |
}; | |
// retriving ratio data $in the set of LSOA codes = areaArray | |
// data schema | |
// e.g. | |
// { | |
// area_id: "string", | |
// gender: "string", | |
// age_band: "string", | |
// ratio: "object", | |
// } | |
return tdxApi.getDatasetDataAsync(serviceRatioId, filter, null, {limit: 0}); | |
} | |
}) | |
.then((response) => { | |
if (!response || response.data.length === 0) { | |
output.debug("NO valid serviceRatio data available"); | |
} else { | |
ratioData = response.data; | |
console.log(`ratioData length is ${ratioData.length}`); | |
const popletFilter = { | |
area: { | |
$in: areaArray, | |
}, | |
age: { | |
$in: safeBands, | |
}, | |
}; | |
const ratioObject = dicLookup(ratioData, serviceArray); | |
// retriving poplet data within this areaId array | |
return streamResource(populationId, popletFilter, ratioObject, areaServiceId, deststream); | |
// return tdxApi.getDatasetDataAsync(populationId, popletFilter, null, {limit: 0}); | |
} | |
}) | |
// .then((response) => { | |
// if (!response || response.data.length === 0) { | |
// output.debug("NO valid poplet data available"); | |
// } else { | |
// console.log(`poplet data length is ${response.data.length}`); | |
// const ratioObject = dicLookup(ratioData, serviceArray); | |
// ratioData = null; | |
// serviceArray = null; | |
// return mapData(response.data, ratioObject, areaServiceId, deststream); | |
// } | |
// }) | |
.catch((err) => { | |
output.debug(`requesting tdx data with err ${err.message}`); | |
}); | |
}); | |
}); | |
}; | |
const streamResource = function(resourceId, filter, ratioObject, areaServiceId, deststream) { | |
return new Promise((resolve, reject) => { | |
let countor = 0; | |
tdxApi.getNDDatasetData(resourceId, filter, null, {limit: 0}) | |
.pipe(split(JSON.parse, null, {trailing: true})) | |
.on("data", (data) => { | |
countor += 1; | |
processor(data, ratioObject, areaServiceId, deststream); | |
}) | |
.on("end", () => { | |
console.log(`countor is ${countor}`); | |
resolve(); | |
}) | |
.on("error", reject); | |
}); | |
}; | |
const processor = function(populationData, ratioObject, areaServiceId, deststream) { | |
const {area, gender, year, age, data} = populationData; | |
const key = keyHelper(area, gender, age); | |
if (ratioObject[key]) { | |
const outputObject = { | |
areaServiceId: areaServiceId, | |
areaId: area, | |
gender: gender, | |
year: year, | |
ageBand: age, | |
}; | |
_.forEach(ratioObject[key], (ratio, serviceId) => { | |
// add serviceId field to each original poplet object | |
outputObject.serviceId = serviceId; | |
// multiply with service ratio and persons each areaId, each demographic | |
outputObject.count = ratio * data; | |
deststream.write(`${JSON.stringify(outputObject)}\n`); | |
}); | |
} | |
}; | |
/** | |
* parameter fields are: | |
* @param {*} populationData - area_id, gender, age_band, year, persons | |
* @param {*} ratioData - area_id, gender, age_band, year, persosns | |
* for each mapped area_id, each demographic, | |
* multiply with original/renormalized ratio from ratioData | |
* and actual number of people in populationData | |
*/ | |
function mapData(populationData, ratioObject, areaServiceId, deststream) { | |
// re-format the ratioData from an array to a dictionary lookup object | |
// in order to do the multiplications with poplet data | |
return new Promise((resolve, reject) => { | |
// const recursive = function(index) { | |
// if (index < populationData.length) { | |
// const populationObj = populationData[index]; | |
// const key = keyHelper(populationObj.area_id, populationObj.gender, populationObj.age_band); | |
// if (ratioObject[key]) { | |
// const outputObject = { | |
// areaServiceId: areaServiceId, | |
// areaId: populationObj.area_id, | |
// gender: populationObj.gender, | |
// year: populationObj.year, | |
// ageBand: populationObj.age_band, | |
// }; | |
// _.forEach(ratioObject[key], (ratio, serviceId) => { | |
// // add serviceId field to each original poplet object | |
// outputObject.serviceId = serviceId; | |
// // multiply with service ratio and persons each areaId, each demographic | |
// outputObject.count = ratio * populationObj.persons; | |
// deststream.write(`${JSON.stringify(outputObject)}\n`); | |
// }); | |
// } | |
// index += 1; | |
// recursive(index); | |
// } | |
// }; | |
// recursive(0); | |
}); | |
} | |
function dicLookup(ratioData, serviceArray) { | |
let renormalize; | |
// Ratios are formed according to each available service. | |
// If one or more services are being closed, the ratios need to be re-normalised | |
if (serviceArray.length === 1 && serviceArray[0] === "all") { | |
// no service within this ccg code is closed | |
// no need to re-normalize | |
renormalize = false; | |
} else { | |
renormalize = true; | |
} | |
const returnObject = {}; | |
_.forEach(ratioData, (ratioObj) => { | |
let sum = 0; | |
ratioObj.gender = ratioObj.gender === "female" ? "f" : "m"; | |
const key = keyHelper(ratioObj.areaId, ratioObj.gender, ratioObj.ageBand); | |
// const key = `${ratioObj.area_id}${ratioObj.gender}${ratioObj.age_band}`; | |
if (!returnObject[key]) returnObject[key] = {}; | |
ratioObj.serviceRatio = renormalize ? _.pick(ratioObj.serviceRatio, serviceArray) : ratioObj.serviceRatio; | |
// re-calculating ratios | |
// calculate sum of ratios with picked services | |
_.forEach(ratioObj.serviceRatio, (ratio, serviceId) => { | |
sum += ratio; | |
}); | |
// re-normalize service ratio | |
_.forEach(ratioObj.serviceRatio, (ratio, serviceId) => { | |
returnObject[key][serviceId] = ratio / sum; | |
}); | |
}); | |
return returnObject; | |
} | |
/** | |
* | |
* @param {*} arguments - take any number of arguments form a string key | |
*/ | |
const keyHelper = function() { | |
const length = arguments.length; | |
let rString = ""; | |
_.forEach(arguments, (val, i) => { | |
if (i === length - 1) rString += `${val}`; | |
else rString += `${val},`; | |
}); | |
return rString; | |
}; | |
return areaServiceDatabot; | |
}()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment