Skip to content

Instantly share code, notes, and snippets.

@hillar
Last active December 16, 2015 19:16
Show Gist options
  • Save hillar/4b014ba3abcc07a8c5c9 to your computer and use it in GitHub Desktop.
Save hillar/4b014ba3abcc07a8c5c9 to your computer and use it in GitHub Desktop.
send json file to eleasticsearch in bulks
/*
send json file to eleasticsearch in bulks...
exit with error, if
- file does not exist
- can not connect o elasticsearchserver
- elasticsearch status is not green
*/
var fs = require("fs");
var http = require('http');
var byline = require('byline');
function help() {
console.log("usage:");
console.log("json2ela.js filename hostname:port/index/type <bulksize> ");
console.log("");
console.log(" default index is undefined");
console.log(" default type is undefined");
console.log(" default bulksize is 1024");
process.exit(0);
}
if ( process.argv.length < 3 ) {
help();
} else {
var fileName = process.argv[2];
fs.stat(fileName, function(err, stat) {
if ( err != null ) {
console.error(fileName, " not found");
process.exit(1);
} else {
// check elasticsearch connection
var bites = process.argv[3] || 'localhost:9200/undefined/undefined';
var bites = bites.split(":");
var host = bites[0] || "localhost";
if (!bites[1]) bites[1] = '9200/undefined/undefined'
var bites = bites[1].split("/") || [9200];
var port = parseInt(bites[0]) || 9200;
var req = http.request({
hostname: host,
port: port,
path: '/_cluster/health', // see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-health.html
method: 'GET'
}, function(res) {
res.on('data', function (chunk) {
var health = JSON.parse(chunk) || {};
if ( health.status && health.status === "green" ){
// elastic is ok, send file
var index = 'undefined';
var type = 'undefined';
if ( bites.length > 1 ) {
index = bites[1];
if ( bites.length > 2 ) {
type = bites[2]
}
}
var bulksize = parseInt(process.argv[4]) || 1024;
// TODO put tail-stream here
var stream = fs.createReadStream(fileName);
stream = byline.createStream(stream);
var bulk = [];
var count = 0;
stream.on('data', function(line) {
count ++;
bulk.push(line);
if (count % bulksize === 0 ) {
//console.log(count);
stream.pause();
send2ela(host,port,index,type,bulk, stream,count-bulksize);
bulk = [];
}
});
stream.on('end',function(){
// send leftovers
send2ela(host,port,index,type,bulk,null,count-bulk.length);
console.log('sent',count,'lines to',host+':'+port+'/'+index+'/'+type);
//process.exit(0);
});
} else {
console.error(host, " not green");
process.exit(1);
}
});
});
req.on('error', function(e) {
console.error(host, 'problem with request: ' + e.message);
process.exit(1);
});
req.end();
}
});
}
function send2ela(host,port,index,type,bufs,stream,start){
// check only first in bufs
if (!bufs[0]) return;
var test;
try {
test = JSON.parse(bufs[0].toString());
} catch (e) {
console.error('not JSON :: ',e);
process.exit(1);
}
var i = index || 'undefined';
var t = type || 'undefined';
var path = '/'+i+'/'+t+'/_bulk';
var elastic = {
hostname: host||'localhost',
port: port||9200,
path: path,
method: 'POST'
};
var req = http.request(elastic, function(res) {
var rcount = 0;
var errors = false;
var response = '';
//console.log('STATUS: ' + res.statusCode);
//console.log('HEADERS: ' + JSON.stringify(res.headers));
//res.setEncoding('utf8');
res.on('data', function (chunk) {
rcount ++;
if (rcount === 1) {
// {"took":7,"errors":true,"items":[{"create":{"_index":"test","_type":"eve","_id":
var tmp = chunk.toString().substr(0,80);
errors = tmp.indexOf('"errors":true') != -1 || false;
if (errors) {
//console.log('got error ELA: ' + chunk.toString().substr(0,80));
}
}
if (errors) { // collect elasticsearch response only on error
response += chunk;
}
});
res.on('end',function(){
if (errors) {
var test;
try {
test = JSON.parse(response.toString());
} catch (e) {
console.error('elasticsearch response is not JSON :: ',e);
process.exit(1);
}
for (var i = 0; i < test.items.length; i++){
if (test.items[i].create.error) {
console.error('error in line no:',start +i + 1 , '; status:',test.items[i].create.status,';',test.items[i].create.error.substr(0,130), '... ; ',bufs[i].toString().substr(0,130));
}
}
}
if (stream) stream.resume();
});
});
req.on('error', function(e) {
console.error('problem with request: ' + e.message);
});
// write bufs to request body
for (var i = 0; i < bufs.length; i++) {
req.write('{create:{}}\n');
req.write(bufs[i].toString() + '\n');
//console.log('sending',bufs[i].toString());
}
req.end();
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment