Skip to content

Instantly share code, notes, and snippets.

@thanthos
Last active February 25, 2016 02:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thanthos/b18d465ab7771533487a to your computer and use it in GitHub Desktop.
Save thanthos/b18d465ab7771533487a to your computer and use it in GitHub Desktop.
This is a Javascript which helps with the extraction of data in elastic search. Somehow, this script uses less memory than the default Logstash. ( I encountered errors with logstash but not with Javascripts. ) This iteration of the script is single threaded and more for someone who knows how to write Javascripts. Will evolve it to become more of…
require('dotenv').load({'path': '.env'});
var elasticSearch = require("elasticsearch");
var esRWClient = require("./esClient"); //This is es client initialization
var esRClient = require("./esClient_readOnly");
var bunyan = require('bunyan');
var target_index = process.env.target||'reindex'; //Change this to your target index .
var source_index = process.env.source; //Change this to your source index.
var global_scroll_id;
var failedLoged = bunyan.createLogger({
name:'failedLogger',
streams: [{
type: 'rotating-file',
path: 'failIndexed.log',
period: '1d', // daily rotation
count: 3, // keep 3 back copies
level:'debug'
}],
});
var log = bunyan.createLogger({
name: 'reindex',
streams: [
{
level: 'info',
stream: process.stdout
},
{
level: 'debug',
path: 'reindex.debug.log' // log ERROR and above to a file
}
]
});
var name = __filename;
var doneCount = 0; //For total Read
var indexCount = 0; //Successful Indexing
var failCount = 0; //failed
var alreadyDone = 0; //This is for the redo.
var offset = process.env.offset||0;
var size = process.env.size||1000;//Change this as per your need. A bigger number for more powerful elasticserach source. A smaller number for those with resource constrain.
var totalPopulation = 0;
var batchWriteSize = process.env.batchwritesize||500;
var startTime=0, endTime=0;
//Pass in the upserting function if requiring to update.
function read(scrollID){
function printStatus(count, batchsize, success, failed, total){
if ( count >= batchsize){
log.info({total:total,batch:batchsize,success:success, failed, failed,count:count, offset:offset, alreadyDone:alreadyDone},"Progress so far ");
}
}
log.debug("read. ScrollID:- "+scrollID);
function processResponse(resp){ //Single Record
var resultArray = resp.hits.hits;
var batchSize = resp.hits.hits.length;
global_scroll_id = resp._scroll_id;
var current = 0;
log.debug({batchSize: batchSize});
function nextStep(){
if ( doneCount >= totalPopulation && current >= batchSize){
log.info({total:totalPopulation,success:indexCount,failed:failCount},"Total Completed." );
}else if (current >= batchSize) {
printStatus(doneCount, batchSize,indexCount,failCount,totalPopulation);
read(resp._scroll_id);
}
};
for ( var i in resultArray ){
var instance = resultArray[i];
log.trace({instance:instance});
try{
esRWClient.create({
"index":target_index,
"type":instance._type,
"id":instance._id,
"body":instance._source, //You may need to change this to suit your reindexing needs
"timestamp":instance._source["@timestamp"]
}).then(function (response){
log.trace({index:target_index,type:instance._type,id:instance._id},"document");
indexCount++;
doneCount++;
current++;
nextStep();
}, function (error){
if ( !error.status == 409 ) {
failedLoged.error({error:error.path},"Error Indexing");
failCount++
}else{
alreadyDone++;
}//
current++;
doneCount++;
nextStep();
});
}catch(e) {
log.error(e);
}
}
}
function processInBatch(resp){
var resultArray = resp.hits.hits;
var batchSize = resp.hits.hits.length;
var current = 0;
global_scroll_id = resp._scroll_id;
var request_body = []; //This is the payload for the bulk index
function composeAction(instance){
request_body.push({
create:{
_index:target_index,
_type:instance._type,
_id:instance._id,
_timestamp:instance._source["@timestamp"]
}
});
request_body.push(instance._source);
}
for ( var i in resultArray ){
var instance = resultArray[i];
composeAction(instance);
if ( current >= batchWriteSize || current >= batchSize || current >= totalPopulation ){
esRWClient.bulk({body:request_body}).then(
function(response){
log.debug({response:response},"success res");
indexCount=indexCount+current;
doneCount=doneCount+current;
},
function(error){
log.debug({error:error},"err res");
failCount = failCount+current;
doneCount=doneCount+current;
});
}
}
}
if ( scrollID||global_scroll_id ) {
try{
log.debug("To Next Frame");
esRClient.scroll({scrollId:scrollID, scroll: '2m'}).then(
function(response){
processResponse(response);
}, function(err){
//TODO. Differentiate between the error and only retry those due to network
log.warn({error:err,doneCount:doneCount,totalPopulation:totalPopulation},"Issue with Scrolling to next. Retrying");
if ( !(doneCount >= totalPopulation) ){
setTimeout(read(global_scroll_id),1000);
}
});
}catch(e){
//TODO. Differentiate between the error and only retry those due to network
log.warn({error:e,doneCount:doneCount,totalPopulation:totalPopulation},"Issue with Scrolling to next. Retrying");
if ( !(doneCount >= totalPopulation) ){
setTimeout(read(global_scroll_id),1000);
}
}
}else{
log.info("Getting Records");
log.info({target_index:target_index,source_index:source_index});
esRClient.search({
'index':source_index,
'size':size,
'scroll':"2m",
'body':{"query": {
"filtered": {
"query": {"query_string": { "query": "*" } },
"filter":{"bool": {
"must": [{"range": {"@timestamp": { "gte": startTime||0,"lte": endTime||9000000000000, "format": "epoch_millis" }}}],
"must_not": []
}
}
}
}},
'_source':true,
'_sourceInclude': ["message","type","@timestamp","post_code","device_id","@version"]
}).then(function(response){
totalPopulation = response.hits.total;
log.info("Total Records Matched :"+totalPopulation);
processResponse(response);
},function(err){
log.warn({err:err},"Issue with Getting Records. Not Retrying");
printStatus(0, -9,indexCount,failCount,0);
log.error(err);
});
}
}
function printUsage(){
console.log("Incorrect Usage");
console.log("node javascript_reindexing.js [optional log level]");
}
process.on('uncaughtException', function(err){
if ( err != 0){
log.warn({err:err});
console.log("doneCount "+doneCount +" ::::indexCount "+indexCount +" ::::failCount "+ failCount );
}
if ( global_scroll_id && esRClient ){
console.log("Clear Scroll");
esRClient.clearScroll(global_scroll_id);
}
});
process.on('exit', function(err){
if ( err != 0 ){
log.warn({err:err});
console.log("doneCount "+doneCount +" ::::indexCount "+indexCount +" ::::failCount "+ failCount );
}
if ( global_scroll_id && esRClient ){
console.log("Clear Scroll");
esRClient.clearScroll(global_scroll_id);
}
});
if ( process.argv.length < 3 ) {
printUsage();
process.exit(9);
}
if ( process.argv[2] ){
if ( process.argv[2] != 'trace' &&
process.argv[2] != 'debug' &&
process.argv[2] != 'info' &&
process.argv[2] != 'warn' &&
process.argv[2] != 'error' &&
process.argv[2] != 'fatal' ) {
log = bunyan.createLogger({
name: 'reindex',
level: process.env.LOG_LEVEL||'info'
});
log.warn("Incorrect Log level Specified at commandline. Defaulting to "+process.env.LOG_LEVEL||'info');
}else{
log = bunyan.createLogger({
name: 'reindex',
level: process.argv[2]
});
}
}
if (process.argv[3]) {
startTime = process.argv[3]||0;
}
if (process.argv[4]) {
endTime = process.argv[4]||0;
}
log.info("Reindexing");
read();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment