Skip to content

Instantly share code, notes, and snippets.

@BingjieGao
Created July 6, 2017 10:33
Show Gist options
  • Save BingjieGao/2371f4d47a75839342fa2b3745da731d to your computer and use it in GitHub Desktop.
Save BingjieGao/2371f4d47a75839342fa2b3745da731d to your computer and use it in GitHub Desktop.
module.exports = (function() {
"use strict";
const _ = require("lodash");
const safeBands = require("./safe-age");
const Promise = require("bluebird");
const split = require("split");
let tdxApi;
/**
*
* @param {*} serviceRatioId - datasetId refers to serviceRatio
* @param {*} populationId - datasetId refers to poplets data, default one is S1gCLZJxNe
* @param {*} networkId - datasetId refers to network that contains a set of opened services
*/
const areaServiceDatabot = function(input, output, context, deststream) {
/**
* @param {*} areaServiceDataset - the unique areaServiceId points
* to a document in the area service lookup
* schema is
* {
* areaServiceId: "string",
* populationId: "string"
* networkId: "string",
* behaviourFunctionId: "string",
* }
* @param {*} araeServiceId - UID
* @param {*} ccgCode - within this ccg area
*/
const areaServiceDataset = context.packageParams.areaServiceDataset;
const areaServiceId = input.areaServiceId;
// const ccgCode = input.ccgCode;
tdxApi = context.tdxApi;
const mappingId = input.mappingId;
let populationId;
let networkId;
let serviceRatioId;
// a set of serviceArray from networkId
let serviceArray = null;
let areaArray = null;
let ratioData = null;
return tdxApi.getDistinctAsync(mappingId, "parentId", {parentType: "CCG15CD", childType: "LSOA11CD", mappingType: "ons-mapping"}, null, null)
.then((response) => {
console.log(`retrive ccg length is ${response.data.length}`);
return Promise.each(response.data, (ccgCode) => {
// requesting data according to each senario
return tdxApi.getDatasetDataAsync(areaServiceDataset, {areaServiceId}, null, null)
.then((result) => {
if (!result || result.data.length !== 1) {
output.debug(`NO result requesting from areaServiceDataset with input UUID ${areaServiceId}`);
} else {
populationId = result.data[0].populationId;
networkId = result.data[0].networkId;
serviceRatioId = input.behaviourId;
const projection = {
serviceId: 1,
};
return tdxApi.getDatasetDataAsync(networkId, null, projection, {limit: 0});
}
})
.then((response) => {
// get all lsoa areas within given ccgCode
if (!response || response.data.length === 0) {
output.debug("NO result from network datset");
} else {
// a set of serviceIds
serviceArray = _.map(response.data, "serviceId");
// boundary look up dataset from tdx
// const patientsMapping = context.packageParams.patientsMapping;
// matching pipeline, all lsoa areas $in this ccgCode
// const pipeline = [
// {
// $match: {
// parentId: ccgCode,
// },
// },
// {
// $group: {
// _id: null,
// childArray: {
// $push: "$childId",
// },
// },
// },
// ];
const patientsFilter = {
parentId: ccgCode,
mappingType: "patients-mapping",
childType: "LSOA11CD",
};
const patientsProjection = {
childId: 1,
};
// retriving all childIds as a an array with parentId matching given ccg code
return tdxApi.getDatasetDataAsync(mappingId, patientsFilter, patientsProjection, {limit: 0});
// a set of LSOA areas
}
})
.then((response) => {
if (!response || response.data.length === 0) {
output.debug("NO valid matching data from boundary lookup");
} else {
// lsoa areas filtered by given ccg code
// from gp view, the total poplets registered in gps within this ccg area
// gp points in this ccg code have patients from
// lsoas $in areaArray
// response is {data: [{childId:}]}
areaArray = _.map(response.data, "childId");
// resuest all serviceRatio data within this areaArray
const filter = {
// this need to be fixed, the new field should be areaId instead of _
areaId: {
$in: areaArray,
},
ageBand: {
$in: safeBands,
},
};
// retriving ratio data $in the set of LSOA codes = areaArray
// data schema
// e.g.
// {
// area_id: "string",
// gender: "string",
// age_band: "string",
// ratio: "object",
// }
return tdxApi.getDatasetDataAsync(serviceRatioId, filter, null, {limit: 0});
}
})
.then((response) => {
if (!response || response.data.length === 0) {
output.debug("NO valid serviceRatio data available");
} else {
ratioData = response.data;
console.log(`ratioData length is ${ratioData.length}`);
const popletFilter = {
area: {
$in: areaArray,
},
age: {
$in: safeBands,
},
};
const ratioObject = dicLookup(ratioData, serviceArray);
// retriving poplet data within this areaId array
return streamResource(populationId, popletFilter, ratioObject, areaServiceId, deststream);
// return tdxApi.getDatasetDataAsync(populationId, popletFilter, null, {limit: 0});
}
})
// .then((response) => {
// if (!response || response.data.length === 0) {
// output.debug("NO valid poplet data available");
// } else {
// console.log(`poplet data length is ${response.data.length}`);
// const ratioObject = dicLookup(ratioData, serviceArray);
// ratioData = null;
// serviceArray = null;
// return mapData(response.data, ratioObject, areaServiceId, deststream);
// }
// })
.catch((err) => {
output.debug(`requesting tdx data with err ${err.message}`);
});
});
});
};
const streamResource = function(resourceId, filter, ratioObject, areaServiceId, deststream) {
return new Promise((resolve, reject) => {
let countor = 0;
tdxApi.getNDDatasetData(resourceId, filter, null, {limit: 0})
.pipe(split(JSON.parse, null, {trailing: true}))
.on("data", (data) => {
countor += 1;
processor(data, ratioObject, areaServiceId, deststream);
})
.on("end", () => {
console.log(`countor is ${countor}`);
resolve();
})
.on("error", reject);
});
};
const processor = function(populationData, ratioObject, areaServiceId, deststream) {
const {area, gender, year, age, data} = populationData;
const key = keyHelper(area, gender, age);
if (ratioObject[key]) {
const outputObject = {
areaServiceId: areaServiceId,
areaId: area,
gender: gender,
year: year,
ageBand: age,
};
_.forEach(ratioObject[key], (ratio, serviceId) => {
// add serviceId field to each original poplet object
outputObject.serviceId = serviceId;
// multiply with service ratio and persons each areaId, each demographic
outputObject.count = ratio * data;
deststream.write(`${JSON.stringify(outputObject)}\n`);
});
}
};
/**
* parameter fields are:
* @param {*} populationData - area_id, gender, age_band, year, persons
* @param {*} ratioData - area_id, gender, age_band, year, persosns
* for each mapped area_id, each demographic,
* multiply with original/renormalized ratio from ratioData
* and actual number of people in populationData
*/
function mapData(populationData, ratioObject, areaServiceId, deststream) {
// re-format the ratioData from an array to a dictionary lookup object
// in order to do the multiplications with poplet data
return new Promise((resolve, reject) => {
// const recursive = function(index) {
// if (index < populationData.length) {
// const populationObj = populationData[index];
// const key = keyHelper(populationObj.area_id, populationObj.gender, populationObj.age_band);
// if (ratioObject[key]) {
// const outputObject = {
// areaServiceId: areaServiceId,
// areaId: populationObj.area_id,
// gender: populationObj.gender,
// year: populationObj.year,
// ageBand: populationObj.age_band,
// };
// _.forEach(ratioObject[key], (ratio, serviceId) => {
// // add serviceId field to each original poplet object
// outputObject.serviceId = serviceId;
// // multiply with service ratio and persons each areaId, each demographic
// outputObject.count = ratio * populationObj.persons;
// deststream.write(`${JSON.stringify(outputObject)}\n`);
// });
// }
// index += 1;
// recursive(index);
// }
// };
// recursive(0);
});
}
function dicLookup(ratioData, serviceArray) {
let renormalize;
// Ratios are formed according to each available service.
// If one or more services are being closed, the ratios need to be re-normalised
if (serviceArray.length === 1 && serviceArray[0] === "all") {
// no service within this ccg code is closed
// no need to re-normalize
renormalize = false;
} else {
renormalize = true;
}
const returnObject = {};
_.forEach(ratioData, (ratioObj) => {
let sum = 0;
ratioObj.gender = ratioObj.gender === "female" ? "f" : "m";
const key = keyHelper(ratioObj.areaId, ratioObj.gender, ratioObj.ageBand);
// const key = `${ratioObj.area_id}${ratioObj.gender}${ratioObj.age_band}`;
if (!returnObject[key]) returnObject[key] = {};
ratioObj.serviceRatio = renormalize ? _.pick(ratioObj.serviceRatio, serviceArray) : ratioObj.serviceRatio;
// re-calculating ratios
// calculate sum of ratios with picked services
_.forEach(ratioObj.serviceRatio, (ratio, serviceId) => {
sum += ratio;
});
// re-normalize service ratio
_.forEach(ratioObj.serviceRatio, (ratio, serviceId) => {
returnObject[key][serviceId] = ratio / sum;
});
});
return returnObject;
}
/**
*
* @param {*} arguments - take any number of arguments form a string key
*/
const keyHelper = function() {
const length = arguments.length;
let rString = "";
_.forEach(arguments, (val, i) => {
if (i === length - 1) rString += `${val}`;
else rString += `${val},`;
});
return rString;
};
return areaServiceDatabot;
}());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment