Last active
September 12, 2018 11:52
-
-
Save lighta/9885f7ad673d45858348 to your computer and use it in GitHub Desktop.
Nodejs converter from mysql to elasticsearch node
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// CodeStyle : | |
// fu.. those stupid implicite call, please use semicolon, return and everything to avoid confusion | |
var parser = require('xml2json'); | |
var async = require('async'); | |
var fs = require('fs'); | |
var process = require('process'); | |
var moment = require('moment'); | |
var mysql_singleton = require('./mysql_singleton'); | |
var es_singleton = require('./es_singleton'); | |
var logger = require('./winston_logger'); | |
function put_mapping(es_set,callback){ | |
logger.info("Entering put_mapping"); | |
const curcol_idx= es_set['index']; | |
const curcol_doctype= es_set['type']; | |
const query_create= es_set['query']['query_create']; | |
var query_mapping = { | |
'index':curcol_idx, | |
'body': {} | |
}; | |
query_mapping['body']['mappings'] = {}; | |
query_mapping['body']['mappings'][curcol_doctype] = query_create; | |
es_singleton.getPool.indices.create( query_mapping | |
, function (err, response, status){ | |
if(err === true) { logger.error("putMapping err="+err); } | |
logger.info("Index creation success"); | |
callback(err); | |
return; | |
}); | |
return; | |
} | |
function check_exist(es_set,callback){ | |
logger.info("Entering check_exist"); | |
const curcol_idx= es_set['index']; | |
es_singleton.getPool.indices.exists({ | |
// 'local': false, | |
'index': curcol_idx | |
}, function (error, exists) { | |
if (exists === true) { | |
logger.info("Check success"); | |
callback(error); | |
} else { | |
put_mapping(es_set,callback); // comment to use Auto mapping by ES | |
callback(error); | |
} | |
return; //end exist callback | |
}); | |
return; | |
} | |
/** | |
* Fonction that return the number of stored document and the last ID | |
* */ | |
function check_countsave(es_set,callback){ | |
logger.info("Entering check_countsave"); | |
const curcol_idx= es_set['index']; | |
const curcol_doctype= es_set['type']; | |
const query_count= es_set['query']['query_count']; | |
//data we want to calculate here | |
var count_saved = 0; | |
var last_data = 0; | |
es_singleton.getPool.search({ | |
index: curcol_idx, | |
type: curcol_doctype, | |
body: query_count | |
}, function (error, response) { | |
//es_singleton.getPool.close(); | |
if(error){ | |
logger.error('check_countsave, Err:'+error); | |
fs.writeFile('debug/check_countsave_debug.json', JSON.stringify(error), FileSaveAck); | |
} | |
else { | |
//logger.info('response='+JSON.stringify(response)); | |
var hit = response['hits']; | |
count_saved = hit['total']; | |
if(hit['hits'].length > 0){ | |
last_data = hit['hits'][0]['_id']; | |
} | |
logger.info("count_saved="+count_saved+" timestamp="+last_data); | |
} | |
callback(error,{'count_saved':count_saved, 'last_data':last_data}); | |
return; | |
}); | |
return; | |
} | |
/** | |
* Calculate the number of batch needed with our number our cpu_process, batch_min and batch_max | |
* Return the number size of batch. (Parameter for SQL query) | |
* */ | |
function calc_nbbatch(count_toAdd,batch_opt,callback) { | |
var curcol_batch_min = batch_opt['min']; | |
var curcol_batch_max = batch_opt['max']; | |
const curcol_max_process = batch_opt['nb_process']; | |
//what we want to calculate here | |
var nbToAdd=0; | |
if(curcol_batch_min > curcol_batch_max){ | |
logger.info('Damn ur sh.., since when a min is superior to a max ?... swaping values, current min='+curcol_batch_min+' max='+curcol_batch_max); | |
curcol_batch_min = curcol_batch_min ^ curcol_batch_max; | |
curcol_batch_max = curcol_batch_min ^ curcol_batch_max; | |
curcol_batch_min = curcol_batch_min ^ curcol_batch_max; | |
logger.info('min='+curcol_batch_min+' max='+curcol_batch_max); | |
} | |
if(count_toAdd <= 0){ | |
callback(null, {'nb_batch':batch_size , 'batch_size':nb_batch}); | |
return; | |
} | |
batch_size = Math.ceil(count_toAdd/curcol_max_process); //let try to have 20 thread/process only | |
if(batch_size < curcol_batch_min) { | |
batch_size = curcol_batch_min; //min 10K | |
} | |
if(batch_size > curcol_batch_max) { | |
batch_size = curcol_batch_max; //max 1M | |
} | |
nb_batch = Math.ceil(count_toAdd/batch_size); //auto recalc nb of thread batch | |
callback(null, {'nb_batch':nb_batch , 'batch_size':batch_size}); | |
return; | |
} | |
/** | |
* Fonction servant a calculer combien de data (fichier xml) devront etre processer (rajouter) | |
* Applique egalement les strategie de regression si l'ID n'est pas en mode autoincrement | |
* */ | |
function calc_nbToAdd(count_set,callback) { | |
const count_new=count_set['new']; | |
const count_tot=count_set['tot']; | |
const count_saved=count_set['saved']; | |
const count_old = count_tot-count_new; | |
const nb_unsync = count_old-count_saved; | |
const count_toAdd = count_tot-count_old; //should be = to count new if id increase | |
logger.info('count_new='+count_new+' count_old='+count_old+' nb_unsync='+nb_unsync+' count_toAdd='+count_toAdd ); | |
//update our set | |
count_set['old'] = count_old; | |
count_set['unsync'] = nb_unsync; | |
count_set['toAdd'] = count_toAdd; | |
//Apply check_unsync strategie ?? | |
//placeholder in case if needed | |
callback(null, count_set); | |
return; | |
} | |
function FileSaveAck(err){ | |
if(err){ | |
logger.error('Couldn\'t save file'); | |
} else { | |
logger.info('File saved'); | |
} | |
} | |
function parse_sub_recellState(rcell_state,psub_recellState_callback){ | |
async.each(rcell_state, // loop trough rcell_state (let call them final result) | |
function(state,rcell_resul_task_callback){ | |
if('Result' in state){ | |
var tmp = state['Result']; | |
if(tmp === Object(tmp)){ | |
// logger.info('found:'+ JSON.stringify(item) ); | |
tmp = ''; // erase it (tmp to fix conflict type) | |
} | |
} | |
rcell_resul_task_callback(); | |
return; | |
}, | |
function(err){ // all states looped done | |
if(err){ logger.error('rcell_res_final, Err:'+err); } | |
psub_recellState_callback(); | |
return; | |
} | |
); //end each rcell_res | |
return; | |
} | |
function check_attribute(attribute, base, id, isarray) { | |
const debug_fs_name = 'debug/checkattr_debug'+id+'_'+base+'_'+attribute+'.json'; | |
if(!(attribute in base)){ | |
logger.error('check_attribute errors, "'+attribute+'" object not found in "'+base+'" for id='+id); | |
fs.writeFile(debug_fs_name, JSON.stringify(base), FileSaveAck); | |
return null; | |
} | |
var ref_tmp = base[attribute]; | |
if(isarray == true){ //check if it's an array | |
if(ref_tmp.length <= 0) { | |
logger.error('check_attribute errors, "'+attribute+'" not an array or empty in "'+base+'" for id='+id); | |
fs.writeFile(debug_fs_name, JSON.stringify(ref_tmp), FileSaveAck); | |
return null; | |
} | |
} | |
return ref_tmp; | |
} | |
/** | |
* Check JSON field, and transform them so that ES accept it | |
* This is done in an async manner on all fields | |
* Also add the query to be done in an array for es_bulk | |
* */ | |
function prepare_fill(id,json,datas,curcol_idx,curcol_doctype,pcallback){ | |
//logger.info('Entering rescall'); | |
const debug_fs_name = 'debug/prepare_fill_debug_'+id+'.json'; | |
logger.info('prepare_fill converting id='+id); | |
//logger.info('json ='+JSON.stringify(json)); | |
if(!('jav:RecellDesktopService' in json)){ | |
if(('tns:PhoneTestService' in json)){ | |
logger.info('Found "tns:PhoneTestService", (unsuported yet, dumping... for id='+id); | |
fs.writeFile('debug/prepare_fill_debug_tns_'+id+'.json', JSON.stringify(json), FileSaveAck); | |
} | |
else { //totally unhandled | |
logger.error('Not supported json (basetype not "jav:RecellDesktopService") for id='+id); | |
fs.writeFile('debug/prepare_fill_debug_base_'+id+'.json', JSON.stringify(json), FileSaveAck); | |
} | |
pcallback(); | |
return; | |
} | |
var ref_json = json["jav:RecellDesktopService"][0]; | |
if('Computer' in ref_json){ | |
if('Uptime' in ref_json['Computer'][0]){ | |
if('$t' in ref_json['Computer'][0]["Uptime"][0]["$t"]){ | |
var uptime = ref_json['Computer'][0]["Uptime"][0]["$t"]; | |
//if( isNaN(uptime) == true){ | |
if (typeof uptime != "number") { | |
logger.error('Normilizing Computer.Uptime for id='+id); | |
uptime = 0; //normalize it | |
console.log('This is not number'); | |
} | |
} | |
} | |
} | |
var rcell_services = check_attribute('RecellServices', ref_json, id, true); | |
if( rcell_services === null){ | |
pcallback(); | |
return; | |
} | |
async.each(rcell_services, | |
function(services,rcell_services_task_callback){ // loop trough all services | |
var ref_service = check_attribute('RecellService', services, id, true); | |
if( ref_service === null){ | |
rcell_services_task_callback(); | |
return; | |
} | |
async.each(ref_service, | |
function(service,ref_service_task_callback){ // loop trough all service | |
var rcell_states = check_attribute('RecellStates', service, id, true); | |
if( rcell_states === null){ | |
ref_service_task_callback(); | |
return; | |
} | |
async.each(rcell_states, // loop trough all states | |
function(state,rcell_states_task_callback){ | |
var ref_state = check_attribute('RecellState', state, id, true); | |
if( ref_state === null){ | |
rcell_states_task_callback(); | |
return; | |
} | |
parse_sub_recellState(ref_state,rcell_states_task_callback); | |
return; | |
}, | |
function(err){ // all states looped done | |
if(err){ logger.info('rcell_states_final, Err:'+err); } | |
ref_service_task_callback(); | |
return; | |
} | |
); //end each rcell_states | |
return; | |
}, | |
function(err){ // all service looped done | |
if(err){ logger.info('rcell_services_final, Err:'+err); } | |
rcell_services_task_callback(); | |
return; | |
} | |
); | |
return; //end ref_service | |
}, | |
function(err){ // all services looped done | |
if(err){ logger.info('rcell_services_final, Err:'+err); } | |
else { //add our modified json into datas for bulk | |
datas.push({ index: { _index: curcol_idx, _type: curcol_doctype, _id: id } }); | |
datas.push(json); | |
} | |
pcallback(); | |
return; | |
} | |
); | |
return; | |
} | |
function Es_BulkAck(err, resp) { | |
if(err){ | |
logger.error('Es_bulk error='+err); | |
} | |
else { | |
if(resp['errors']==true){ | |
logger.error('Es_bulk error, dumping response_json'); | |
fs.appendFile('debug/Es_bulk.json', '\n'+JSON.stringify(resp), FileSaveAck); | |
// fs.writeFile('debug/Es_bulk.json', JSON.stringify(resp), FileSaveAck); | |
} | |
} | |
return; | |
} | |
function ResultSQL_ACK(err) { | |
if(err){ | |
logger.error('ResultSQL_ACK error='+err); | |
} | |
else { | |
logger.info('All result SQL treated'); | |
} | |
return; | |
} | |
/** | |
* Function to grab the data to MYSQL transform and fill into ES. | |
* (@TODO split me) | |
* */ | |
function fetch_data(connection,idx_b,last_data_set,fetch_callback){ | |
const nb_batch = last_data_set['nb_batch']; //avoid * dereference in loop | |
const batch_size = last_data_set['batch_size']; | |
const xml_opt = last_data_set['xml_opt']; | |
const curcol_idx = last_data_set['curcol_idx']; | |
const curcol_doctype = last_data_set['curcol_doctype']; | |
const last_id = last_data_set['last_data']; | |
const sql = 'select id,xml from record where id > '+last_id+' LIMIT '+batch_size+' OFFSET '+(idx_b*batch_size); | |
if(nb_batch <= 0){ | |
return; | |
} | |
logger.info('Entering fetch_data'); | |
//logger.info('bath_set='+JSON.stringify(batch_set)); | |
logger.info('x:'+idx_b+'/'+nb_batch+', querying ='+sql); | |
connection.query(sql, [], function(err, results) { | |
if(idx_b === nb_batch-1 ) { | |
connection.release(); // always put connection back in pool after last query | |
//es_singleton.getPool.close(); | |
fetch_callback(err, 'done'); | |
} | |
else // if (idx_b < 3) | |
{ //start other query in meantime gogogo ! | |
fetch_data(connection,++idx_b,last_data_set,fetch_callback); | |
} | |
if(err) { | |
logger.error(err); | |
return; | |
} | |
if(results.length <= 0){ | |
return; | |
} | |
var datas = []; | |
async.each(results, | |
function(res,callback){ // 2nd param is the function that each item is passed to | |
//logger.info(results); | |
const id=res['id']; | |
const xmlText = res['xml']; | |
var json = parser.toJson(xmlText,xml_opt); //returns a string containing the JSON structure by default | |
//logger.info(json); | |
prepare_fill(id, json ,datas,curcol_idx,curcol_doctype,callback); | |
return; //end task callback | |
}, | |
function(err){ // 3rd param is the function to call when everything's done | |
if(err){ | |
logger.error('Error in parsing sql_result='+sql+' err:'+err); | |
fetch_callback(); | |
return; | |
} | |
//logger.info('ES_CALL, data='+JSON.stringify(datas)); | |
if(datas.length > 0) { | |
es_singleton.getPool.bulk({ | |
body: datas | |
}, Es_BulkAck); | |
} | |
ResultSQL_ACK(err); | |
//do not fetch_callback ! | |
return; //end async.each final callback | |
} | |
); //end async.each | |
return; //end query_select callback | |
}); //end query_sql _select id,xml | |
return; | |
} | |
/** | |
* Function to load configuration from json file | |
* */ | |
function load_config(callback){ | |
var json_config; | |
var fd = fs.readFile('config.json',{'encoding':'utf8', 'flag':'r'}, function(err, data){ | |
if(err) logger.error('load_config, fail to open file'); | |
else { | |
logger.info('Config loaded'); | |
const json_config = JSON.parse(data); | |
} | |
callback(err,json_config); | |
return; | |
}); | |
return; | |
} | |
/** | |
* Simple fonction that reschedule our main task to be run in an intervall | |
* NB: We do not use setIntervall as we wish to only fire this once the current collecting process is ended | |
* else will end up with lot of process competiting... | |
* */ | |
function reschedule(){ | |
//setTimeout(main, 1000*60*2); //call us again | |
logger.error('Asking to reschedule'); | |
return; | |
} | |
/** | |
* General process (main) : | |
* Check if es_index exist => create if not | |
* Get number of document in es_index | |
* Get number of data in mysql | |
* Compare with number of data in ES | |
* Calculate nb_batch | |
* Fetch and fill data | |
* Wait for next_clock | |
* */ | |
function main(){ | |
load_config( function(err,json_config) { | |
if(err){ reschedule(); return; } | |
//logger.info('config='+JSON.stringify(json_config)); | |
const xml_convert_opt = json_config['xml_convert_options']; | |
const curcol_idx = json_config["collectors"][0]["index"]; //@TODO loop on me when rdy ! | |
const curcol_doctype = json_config["collectors"][0]["type"]; //@TODO loop on me when rdy ! | |
const batch_opt = json_config['batch_opt']; //better to have batch setting per collector as data size may varie but well... | |
const es_set = { | |
'query': json_config['es_query'], | |
'index': curcol_idx, | |
'type': curcol_doctype | |
}; | |
//logger.info('xml_convert_opt='+JSON.stringify(xml_convert_opt)); | |
//return; | |
check_exist(es_set,function(err) { | |
check_countsave(es_set,function(err, results) { | |
const count_saved = results['count_saved']; | |
const last_data = results['last_data']; | |
mysql_singleton.getPool.getConnection(function(err, connection) { | |
if(err) { logger.error(err); reschedule(); return; } | |
const TimeStart = moment(); | |
const sql = "select count(1) from record"; | |
connection.query(sql, [], function(err, results) { | |
if(err) { logger.error(err); return; } | |
const count_tot=results[0]['count(1)']; | |
logger.info('Count_tot='+count_tot); | |
if(count_tot <= count_saved){ | |
logger.info("data already loaded, finishing..."); | |
reschedule(); return; | |
} | |
var sql = "select count(1) from record where id > "+last_data; | |
connection.query(sql, [], function(err, results) { | |
if(err) { logger.error(err); reschedule(); return; } | |
async.waterfall([ | |
function(callback) { //calc_nbToAdd | |
const count_new=results[0]['count(1)']; | |
var count_set = { //ensemble of count (will be altered in calcToadd | |
'new':count_new, | |
'tot':count_tot, | |
'saved':count_saved | |
} | |
calc_nbToAdd(count_set,callback); | |
return; | |
}, | |
function(count_set, callback) { //calc_nbbatch | |
const count_toAdd = count_set['toAdd']; | |
const batch_set = calc_nbbatch(count_toAdd,batch_opt,callback); | |
return; | |
}, | |
function(batch_set, callback) { //fetch_data | |
var idx_b=0; //index de loop de batch | |
logger.info('bath_set='+JSON.stringify(batch_set)); | |
const last_data_set = { | |
'nb_batch': batch_set['nb_batch'], | |
'batch_size': batch_set['batch_size'], | |
'xml_opt':xml_convert_opt, | |
'curcol_idx':'test_npm', | |
'curcol_doctype':'test_npm', | |
'last_data': last_data | |
}; | |
// last_data,batch_set,'test_npm','test_npm' | |
fetch_data(connection,idx_b,last_data_set,callback); | |
return; | |
} | |
], function (err, result) { //final | |
var TimeStop = moment(); | |
const minDiff = TimeStop.diff(TimeStart, 'minutes'); | |
logger.info('All waterfall is done, took minDiff='+minDiff); | |
reschedule(); //call us again | |
return; | |
}); | |
return; //end query_sql _count_new callback | |
}); //end query_sql _count_new | |
return; //end query_sql _count callback | |
}); //end query_sql _count_tot | |
return; // end sql_con callback | |
}); //end sql_con | |
return; //end check_countsave callback | |
}); | |
return; //end exist callback | |
}); | |
return; //end load_config callback | |
}); //end load_config | |
return; | |
} | |
//Entry point : | |
setTimeout(main, 3000); //Wait a bit for initialisation then go ! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
WOW,fantastic! I don't understand why don't you put these code in github repository?