Skip to content

Instantly share code, notes, and snippets.

@Chunlin-Li
Created April 17, 2016 19:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Chunlin-Li/631d4aad9b34cc7fce38147a22862411 to your computer and use it in GitHub Desktop.
Save Chunlin-Li/631d4aad9b34cc7fce38147a22862411 to your computer and use it in GitHub Desktop.
function sendToES(topic, list, noRetry) {
let _list = list.splice(0);
reqHelper.post('/_bulk', new Buffer(_list.concat('').join(LF)), (resp, code) => {
/** 错误处理*/
if (code !== 200) {
console.log('HTTP FAILED CODE:', code);
dumpDisk(_list);
} else if (resp.indexOf('"errors":true,') !== -1) {
let errorInfo = []; // errorInfo structure: [{i:index, t: error_type, s: status},{} ...]
let items = JSON.parse(resp).items;
for (var j = 0, val; j < items.length; j ++ ) {
val = (items[j]['create'] || items[j]['update']);
if (val.status >= 400) {
errorInfo.push({
i: j,
s: val.status,
t: val.error.type
});
}
}
console.log('error Info:', JSON.stringify(errorInfo.reduce((p, c) => {
p[c.t] = p[c.t] + 1 || 1;
return p;
}, {})));
resp = null; // prevent closure hold.
/** 无重试*/
// if (noRetry){
let failed = errorInfo.map(item => _list[item.i]);
dumpDisk(failed);
// }
/** 重试*/
if (!noRetry) {
let retry = errorInfo.map(item => _list[item.i].replace(/_index":"[-0-9]{10}/, '_index":"' + dayPlus(/_index":"([-0-9]{10})/.exec(_list[item.i])[1], -1)));
//console.log('retry content: ', JSON.stringify(retry));
/** 避免循环重试*/
sendToES(topic, retry, true);
}
}
}, err => {
console.error('http request error:', err, err.stack);
dumpDisk(_list);
});
console.log(new Date().toJSON(), 'bulk request send', topic);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment