Skip to content

Instantly share code, notes, and snippets.

@lighta
Last active September 12, 2018 11:52
Show Gist options
  • Save lighta/9885f7ad673d45858348 to your computer and use it in GitHub Desktop.
Save lighta/9885f7ad673d45858348 to your computer and use it in GitHub Desktop.
Nodejs converter from mysql to elasticsearch node
// CodeStyle :
// fu.. those stupid implicite call, please use semicolon, return and everything to avoid confusion
var parser = require('xml2json');
var async = require('async');
var fs = require('fs');
var process = require('process');
var moment = require('moment');
var mysql_singleton = require('./mysql_singleton');
var es_singleton = require('./es_singleton');
var logger = require('./winston_logger');
function put_mapping(es_set,callback){
logger.info("Entering put_mapping");
const curcol_idx= es_set['index'];
const curcol_doctype= es_set['type'];
const query_create= es_set['query']['query_create'];
var query_mapping = {
'index':curcol_idx,
'body': {}
};
query_mapping['body']['mappings'] = {};
query_mapping['body']['mappings'][curcol_doctype] = query_create;
es_singleton.getPool.indices.create( query_mapping
, function (err, response, status){
if(err === true) { logger.error("putMapping err="+err); }
logger.info("Index creation success");
callback(err);
return;
});
return;
}
function check_exist(es_set,callback){
logger.info("Entering check_exist");
const curcol_idx= es_set['index'];
es_singleton.getPool.indices.exists({
// 'local': false,
'index': curcol_idx
}, function (error, exists) {
if (exists === true) {
logger.info("Check success");
callback(error);
} else {
put_mapping(es_set,callback); // comment to use Auto mapping by ES
callback(error);
}
return; //end exist callback
});
return;
}
/**
* Fonction that return the number of stored document and the last ID
* */
function check_countsave(es_set,callback){
logger.info("Entering check_countsave");
const curcol_idx= es_set['index'];
const curcol_doctype= es_set['type'];
const query_count= es_set['query']['query_count'];
//data we want to calculate here
var count_saved = 0;
var last_data = 0;
es_singleton.getPool.search({
index: curcol_idx,
type: curcol_doctype,
body: query_count
}, function (error, response) {
//es_singleton.getPool.close();
if(error){
logger.error('check_countsave, Err:'+error);
fs.writeFile('debug/check_countsave_debug.json', JSON.stringify(error), FileSaveAck);
}
else {
//logger.info('response='+JSON.stringify(response));
var hit = response['hits'];
count_saved = hit['total'];
if(hit['hits'].length > 0){
last_data = hit['hits'][0]['_id'];
}
logger.info("count_saved="+count_saved+" timestamp="+last_data);
}
callback(error,{'count_saved':count_saved, 'last_data':last_data});
return;
});
return;
}
/**
* Calculate the number of batch needed with our number our cpu_process, batch_min and batch_max
* Return the number size of batch. (Parameter for SQL query)
* */
function calc_nbbatch(count_toAdd,batch_opt,callback) {
var curcol_batch_min = batch_opt['min'];
var curcol_batch_max = batch_opt['max'];
const curcol_max_process = batch_opt['nb_process'];
//what we want to calculate here
var nbToAdd=0;
if(curcol_batch_min > curcol_batch_max){
logger.info('Damn ur sh.., since when a min is superior to a max ?... swaping values, current min='+curcol_batch_min+' max='+curcol_batch_max);
curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
curcol_batch_max = curcol_batch_min ^ curcol_batch_max;
curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
logger.info('min='+curcol_batch_min+' max='+curcol_batch_max);
}
if(count_toAdd <= 0){
callback(null, {'nb_batch':batch_size , 'batch_size':nb_batch});
return;
}
batch_size = Math.ceil(count_toAdd/curcol_max_process); //let try to have 20 thread/process only
if(batch_size < curcol_batch_min) {
batch_size = curcol_batch_min; //min 10K
}
if(batch_size > curcol_batch_max) {
batch_size = curcol_batch_max; //max 1M
}
nb_batch = Math.ceil(count_toAdd/batch_size); //auto recalc nb of thread batch
callback(null, {'nb_batch':nb_batch , 'batch_size':batch_size});
return;
}
/**
* Fonction servant a calculer combien de data (fichier xml) devront etre processer (rajouter)
* Applique egalement les strategie de regression si l'ID n'est pas en mode autoincrement
* */
function calc_nbToAdd(count_set,callback) {
const count_new=count_set['new'];
const count_tot=count_set['tot'];
const count_saved=count_set['saved'];
const count_old = count_tot-count_new;
const nb_unsync = count_old-count_saved;
const count_toAdd = count_tot-count_old; //should be = to count new if id increase
logger.info('count_new='+count_new+' count_old='+count_old+' nb_unsync='+nb_unsync+' count_toAdd='+count_toAdd );
//update our set
count_set['old'] = count_old;
count_set['unsync'] = nb_unsync;
count_set['toAdd'] = count_toAdd;
//Apply check_unsync strategie ??
//placeholder in case if needed
callback(null, count_set);
return;
}
function FileSaveAck(err){
if(err){
logger.error('Couldn\'t save file');
} else {
logger.info('File saved');
}
}
function parse_sub_recellState(rcell_state,psub_recellState_callback){
async.each(rcell_state, // loop trough rcell_state (let call them final result)
function(state,rcell_resul_task_callback){
if('Result' in state){
var tmp = state['Result'];
if(tmp === Object(tmp)){
// logger.info('found:'+ JSON.stringify(item) );
tmp = ''; // erase it (tmp to fix conflict type)
}
}
rcell_resul_task_callback();
return;
},
function(err){ // all states looped done
if(err){ logger.error('rcell_res_final, Err:'+err); }
psub_recellState_callback();
return;
}
); //end each rcell_res
return;
}
function check_attribute(attribute, base, id, isarray) {
const debug_fs_name = 'debug/checkattr_debug'+id+'_'+base+'_'+attribute+'.json';
if(!(attribute in base)){
logger.error('check_attribute errors, "'+attribute+'" object not found in "'+base+'" for id='+id);
fs.writeFile(debug_fs_name, JSON.stringify(base), FileSaveAck);
return null;
}
var ref_tmp = base[attribute];
if(isarray == true){ //check if it's an array
if(ref_tmp.length <= 0) {
logger.error('check_attribute errors, "'+attribute+'" not an array or empty in "'+base+'" for id='+id);
fs.writeFile(debug_fs_name, JSON.stringify(ref_tmp), FileSaveAck);
return null;
}
}
return ref_tmp;
}
/**
* Check JSON field, and transform them so that ES accept it
* This is done in an async manner on all fields
* Also add the query to be done in an array for es_bulk
* */
function prepare_fill(id,json,datas,curcol_idx,curcol_doctype,pcallback){
//logger.info('Entering rescall');
const debug_fs_name = 'debug/prepare_fill_debug_'+id+'.json';
logger.info('prepare_fill converting id='+id);
//logger.info('json ='+JSON.stringify(json));
if(!('jav:RecellDesktopService' in json)){
if(('tns:PhoneTestService' in json)){
logger.info('Found "tns:PhoneTestService", (unsuported yet, dumping... for id='+id);
fs.writeFile('debug/prepare_fill_debug_tns_'+id+'.json', JSON.stringify(json), FileSaveAck);
}
else { //totally unhandled
logger.error('Not supported json (basetype not "jav:RecellDesktopService") for id='+id);
fs.writeFile('debug/prepare_fill_debug_base_'+id+'.json', JSON.stringify(json), FileSaveAck);
}
pcallback();
return;
}
var ref_json = json["jav:RecellDesktopService"][0];
if('Computer' in ref_json){
if('Uptime' in ref_json['Computer'][0]){
if('$t' in ref_json['Computer'][0]["Uptime"][0]["$t"]){
var uptime = ref_json['Computer'][0]["Uptime"][0]["$t"];
//if( isNaN(uptime) == true){
if (typeof uptime != "number") {
logger.error('Normilizing Computer.Uptime for id='+id);
uptime = 0; //normalize it
console.log('This is not number');
}
}
}
}
var rcell_services = check_attribute('RecellServices', ref_json, id, true);
if( rcell_services === null){
pcallback();
return;
}
async.each(rcell_services,
function(services,rcell_services_task_callback){ // loop trough all services
var ref_service = check_attribute('RecellService', services, id, true);
if( ref_service === null){
rcell_services_task_callback();
return;
}
async.each(ref_service,
function(service,ref_service_task_callback){ // loop trough all service
var rcell_states = check_attribute('RecellStates', service, id, true);
if( rcell_states === null){
ref_service_task_callback();
return;
}
async.each(rcell_states, // loop trough all states
function(state,rcell_states_task_callback){
var ref_state = check_attribute('RecellState', state, id, true);
if( ref_state === null){
rcell_states_task_callback();
return;
}
parse_sub_recellState(ref_state,rcell_states_task_callback);
return;
},
function(err){ // all states looped done
if(err){ logger.info('rcell_states_final, Err:'+err); }
ref_service_task_callback();
return;
}
); //end each rcell_states
return;
},
function(err){ // all service looped done
if(err){ logger.info('rcell_services_final, Err:'+err); }
rcell_services_task_callback();
return;
}
);
return; //end ref_service
},
function(err){ // all services looped done
if(err){ logger.info('rcell_services_final, Err:'+err); }
else { //add our modified json into datas for bulk
datas.push({ index: { _index: curcol_idx, _type: curcol_doctype, _id: id } });
datas.push(json);
}
pcallback();
return;
}
);
return;
}
function Es_BulkAck(err, resp) {
if(err){
logger.error('Es_bulk error='+err);
}
else {
if(resp['errors']==true){
logger.error('Es_bulk error, dumping response_json');
fs.appendFile('debug/Es_bulk.json', '\n'+JSON.stringify(resp), FileSaveAck);
// fs.writeFile('debug/Es_bulk.json', JSON.stringify(resp), FileSaveAck);
}
}
return;
}
function ResultSQL_ACK(err) {
if(err){
logger.error('ResultSQL_ACK error='+err);
}
else {
logger.info('All result SQL treated');
}
return;
}
/**
* Function to grab the data to MYSQL transform and fill into ES.
* (@TODO split me)
* */
function fetch_data(connection,idx_b,last_data_set,fetch_callback){
const nb_batch = last_data_set['nb_batch']; //avoid * dereference in loop
const batch_size = last_data_set['batch_size'];
const xml_opt = last_data_set['xml_opt'];
const curcol_idx = last_data_set['curcol_idx'];
const curcol_doctype = last_data_set['curcol_doctype'];
const last_id = last_data_set['last_data'];
const sql = 'select id,xml from record where id > '+last_id+' LIMIT '+batch_size+' OFFSET '+(idx_b*batch_size);
if(nb_batch <= 0){
return;
}
logger.info('Entering fetch_data');
//logger.info('bath_set='+JSON.stringify(batch_set));
logger.info('x:'+idx_b+'/'+nb_batch+', querying ='+sql);
connection.query(sql, [], function(err, results) {
if(idx_b === nb_batch-1 ) {
connection.release(); // always put connection back in pool after last query
//es_singleton.getPool.close();
fetch_callback(err, 'done');
}
else // if (idx_b < 3)
{ //start other query in meantime gogogo !
fetch_data(connection,++idx_b,last_data_set,fetch_callback);
}
if(err) {
logger.error(err);
return;
}
if(results.length <= 0){
return;
}
var datas = [];
async.each(results,
function(res,callback){ // 2nd param is the function that each item is passed to
//logger.info(results);
const id=res['id'];
const xmlText = res['xml'];
var json = parser.toJson(xmlText,xml_opt); //returns a string containing the JSON structure by default
//logger.info(json);
prepare_fill(id, json ,datas,curcol_idx,curcol_doctype,callback);
return; //end task callback
},
function(err){ // 3rd param is the function to call when everything's done
if(err){
logger.error('Error in parsing sql_result='+sql+' err:'+err);
fetch_callback();
return;
}
//logger.info('ES_CALL, data='+JSON.stringify(datas));
if(datas.length > 0) {
es_singleton.getPool.bulk({
body: datas
}, Es_BulkAck);
}
ResultSQL_ACK(err);
//do not fetch_callback !
return; //end async.each final callback
}
); //end async.each
return; //end query_select callback
}); //end query_sql _select id,xml
return;
}
/**
* Function to load configuration from json file
* */
function load_config(callback){
var json_config;
var fd = fs.readFile('config.json',{'encoding':'utf8', 'flag':'r'}, function(err, data){
if(err) logger.error('load_config, fail to open file');
else {
logger.info('Config loaded');
const json_config = JSON.parse(data);
}
callback(err,json_config);
return;
});
return;
}
/**
* Simple fonction that reschedule our main task to be run in an intervall
* NB: We do not use setIntervall as we wish to only fire this once the current collecting process is ended
* else will end up with lot of process competiting...
* */
function reschedule(){
//setTimeout(main, 1000*60*2); //call us again
logger.error('Asking to reschedule');
return;
}
/**
* General process (main) :
* Check if es_index exist => create if not
* Get number of document in es_index
* Get number of data in mysql
* Compare with number of data in ES
* Calculate nb_batch
* Fetch and fill data
* Wait for next_clock
* */
function main(){
load_config( function(err,json_config) {
if(err){ reschedule(); return; }
//logger.info('config='+JSON.stringify(json_config));
const xml_convert_opt = json_config['xml_convert_options'];
const curcol_idx = json_config["collectors"][0]["index"]; //@TODO loop on me when rdy !
const curcol_doctype = json_config["collectors"][0]["type"]; //@TODO loop on me when rdy !
const batch_opt = json_config['batch_opt']; //better to have batch setting per collector as data size may varie but well...
const es_set = {
'query': json_config['es_query'],
'index': curcol_idx,
'type': curcol_doctype
};
//logger.info('xml_convert_opt='+JSON.stringify(xml_convert_opt));
//return;
check_exist(es_set,function(err) {
check_countsave(es_set,function(err, results) {
const count_saved = results['count_saved'];
const last_data = results['last_data'];
mysql_singleton.getPool.getConnection(function(err, connection) {
if(err) { logger.error(err); reschedule(); return; }
const TimeStart = moment();
const sql = "select count(1) from record";
connection.query(sql, [], function(err, results) {
if(err) { logger.error(err); return; }
const count_tot=results[0]['count(1)'];
logger.info('Count_tot='+count_tot);
if(count_tot <= count_saved){
logger.info("data already loaded, finishing...");
reschedule(); return;
}
var sql = "select count(1) from record where id > "+last_data;
connection.query(sql, [], function(err, results) {
if(err) { logger.error(err); reschedule(); return; }
async.waterfall([
function(callback) { //calc_nbToAdd
const count_new=results[0]['count(1)'];
var count_set = { //ensemble of count (will be altered in calcToadd
'new':count_new,
'tot':count_tot,
'saved':count_saved
}
calc_nbToAdd(count_set,callback);
return;
},
function(count_set, callback) { //calc_nbbatch
const count_toAdd = count_set['toAdd'];
const batch_set = calc_nbbatch(count_toAdd,batch_opt,callback);
return;
},
function(batch_set, callback) { //fetch_data
var idx_b=0; //index de loop de batch
logger.info('bath_set='+JSON.stringify(batch_set));
const last_data_set = {
'nb_batch': batch_set['nb_batch'],
'batch_size': batch_set['batch_size'],
'xml_opt':xml_convert_opt,
'curcol_idx':'test_npm',
'curcol_doctype':'test_npm',
'last_data': last_data
};
// last_data,batch_set,'test_npm','test_npm'
fetch_data(connection,idx_b,last_data_set,callback);
return;
}
], function (err, result) { //final
var TimeStop = moment();
const minDiff = TimeStop.diff(TimeStart, 'minutes');
logger.info('All waterfall is done, took minDiff='+minDiff);
reschedule(); //call us again
return;
});
return; //end query_sql _count_new callback
}); //end query_sql _count_new
return; //end query_sql _count callback
}); //end query_sql _count_tot
return; // end sql_con callback
}); //end sql_con
return; //end check_countsave callback
});
return; //end exist callback
});
return; //end load_config callback
}); //end load_config
return;
}
//Entry point :
setTimeout(main, 3000); //Wait a bit for initialisation then go !
@LeeYunhang
Copy link

WOW,fantastic! I don't understand why don't you put these code in github repository?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment