Last active
February 8, 2018 15:23
-
-
Save 9072997/6aebb6ee85c4be131aadea6842659d47 to your computer and use it in GitHub Desktop.
Elastissearch reindex-er script in php7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/php | |
<?php | |
// public domain, requires php curl extensions | |
// Note: this script can be called with a comma seperated list of | |
// shards from the source cluster (ex 1,3,4) and it will only pull | |
// data from those shards. This can allow you to run this script in | |
// parallell like this: | |
// For 5 shards, all primary, in 5 threads | |
// echo $logPrefix . "0 1 2 3 4" | xargs -n 1 -P 100 -- ./reindex.php | |
// if the source cluster has 5 primary shards | |
// setting this to 10 will result in 50 documents being moved | |
// for each batch | |
$docsPerShard = 250; | |
// this is a debuging tool. Just pick a document attribute that | |
// is not optional | |
$ransomNoteAttribute = ''; | |
// source | |
$sUrl = 'https://foo.qb0x.com'; | |
$sPort = ''; | |
$sUser = ''; | |
$sPass = ''; | |
$sIndex = ''; | |
$sType = ''; | |
// dest | |
$dUrl = 'https://bar.qb0x.com'; | |
$dPort = ''; | |
$dUser = ''; | |
$dPass = ''; | |
$dIndex = ''; | |
$dType = ''; | |
// want to play around with Elasticsearch with no setup? | |
// wondering what to put in those 'dest' fields? | |
// checkout Qbox.io for hosted Elasticsearch solutions | |
//////////////// | |
// END CONFIG // | |
//////////////// | |
// this should be re-factored into functions | |
// also error handleing should be cleaner | |
if(count($argv) >= 2) { | |
if(!preg_match('/^[0-9]+(?:,[0-9]+)*$/', $argv[1]) || count($argv) >= 3) { | |
die("USAGE: php ${argv[0]} [shard-number[,shard-number[...]]]\n" . | |
" edit this file to set server url/port/user/password\n"); | |
} else { // if we got here, it meens the regex matches | |
$limitToShards = $argv[1]; | |
$logPrefix = $limitToShards . ":\t"; | |
echo $logPrefix . "Running on shards $limitToShards\n"; | |
} | |
} else { // no arguments | |
$logPrefix = ''; // if we are running in parallel this shows which thread we are | |
echo "Running on all avalible shards\n"; | |
} | |
$scrollId = ''; | |
$requestNum = 0; | |
$lostCauseIds = []; | |
for(;;) { // we break out | |
$requestNum++; | |
$insertString = ''; | |
echo $logPrefix . "Request $requestNum\n"; | |
$success = false; // ugly retry setup | |
for($retry = 1; $retry <= 3; $retry++) { | |
////////////////////////////////////////// | |
// get some docs from the source server // | |
////////////////////////////////////////// | |
echo $logPrefix . "Getting Data (attempt $retry/3)\n"; | |
// $scrollId will be blank for 1st search | |
if($scrollId == '') { | |
$params = "q=*"; // match all documents | |
$params .= "&scroll=15m"; // push scroll context timeout back 15 minutes from now | |
$params .= "&search_type=scan"; // don't bother returning results in any order | |
$params .= "&size=$docsPerShard"; // because this is a scan search, this is per shard | |
if(isset($limitToShards)) { | |
$params .= "&preference=_shards:$limitToShards"; | |
} | |
$url = "$sUrl/$sIndex/$sType/_search?$params"; | |
$urlWithPort = "$sUrl:$sPort/$sIndex/$sType/_search?$params"; | |
} else { | |
$url = "$sUrl/_search/scroll?scroll=15m&scroll_id=$scrollId"; | |
$urlWithPort = "$sUrl:$sPort/_search/scroll?scroll=15m&scroll_id=$scrollId"; | |
} | |
echo $logPrefix . "URL=$urlWithPort\n"; | |
$ch = curl_init($url); | |
curl_setopt($ch, CURLOPT_PORT, $sPort); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'GET'); | |
curl_setopt($ch, CURLOPT_USERPWD, "$sUser:$sPass"); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, $insertString); | |
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect | |
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout | |
$responseStr = curl_exec($ch); | |
if($errorNum = curl_errno($ch)) { | |
$errorMessage = curl_strerror($errorNum); | |
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n"; | |
continue; | |
} elseif(!$responseStr) { | |
echo $logPrefix . "NO RESPONSE!\n"; | |
continue; | |
} else { | |
$responseLen = strlen($responseStr); | |
echo $logPrefix . "Response $responseLen bytes\n"; | |
} | |
$success = true; | |
break; // if we got here, we don't need to retry | |
} | |
if(!$success) { | |
die($logPrefix . "Quiting after 3 attempts\n"); | |
} | |
// parse response to an array | |
$response = json_decode($responseStr, true); | |
if($response === NULL) { // could not decode json | |
// I don't know if we can recover, so die in a usefull way | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
die($logPrefix . "FAILED TO DECODE JSON\n"); | |
} elseif($response['_shards']['failed'] != 0) { // failed shards | |
$failedShardCount = $response['_shards']['failed']; | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n"; | |
var_dump($response); | |
die($logPrefix . "$failedShardCount SHARDS FAILED\n"); | |
} elseif($response['hits']['total'] == 0) { // no hits | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n"; | |
var_dump($response); | |
die($logPrefix . "NO DOCUMENTS FOUND\n"); | |
} elseif(!isset($response['_scroll_id'])) { // no scroll id returned | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n"; | |
var_dump($response); | |
die($logPrefix . "NO NEW SCROLL ID FOUND\n"); | |
} else { | |
echo $logPrefix . "Decoded Json\n"; | |
} | |
$totalHits = $response['hits']['total']; | |
$hits = $response['hits']['hits']; | |
$numDocumentsInResponse = count($hits); | |
echo $logPrefix . "Receved $numDocumentsInResponse/$totalHits documents\n"; | |
if($numDocumentsInResponse == 0) { | |
if($requestNum == 1) { | |
// first request will not return any documents | |
$scrollId = $response['_scroll_id']; | |
echo $logPrefix . "No documents receved (normal for 1st request)\n"; | |
continue; | |
} else { // we should have gotten documents, but we didn't | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n"; | |
var_dump($response); | |
echo $logPrefix . "NO DOCUMENTS RETURNED\n"; | |
break; // assume it's then end of the search | |
} | |
} | |
// simple "progress" calculation | |
$predictedRequiredRequests = ceil($totalHits / $numDocumentsInResponse); | |
$percentComplete = intdiv(100 * $requestNum, $predictedRequiredRequests); | |
$now = microtime(true); | |
if(isset($prevTimestamp)) { | |
$batchTime = $now - $prevTimestamp; | |
if(isset($avgBatchTime)) { | |
// info becomes less relevent as it becomes older | |
$avgBatchTime = $avgBatchTime*0.9 + $batchTime*0.1; | |
} else { // don't do any fancy averageing | |
$avgBatchTime = $batchTime; | |
} | |
} | |
$prevTimestamp = $now; | |
$remainingSeconds = intval(($predictedRequiredRequests - $requestNum) | |
* (isset($avgBatchTime)?$avgBatchTime:0)); | |
$hours = intdiv($remainingSeconds, 60*60); | |
$minutes = intdiv($remainingSeconds % (60*60), 60); | |
$seconds = $remainingSeconds % 60; | |
echo $logPrefix . "Request $requestNum/$predictedRequiredRequests\n"; | |
printf("%s%d%% complete, %d:%02d:%02d Remaining\n", $logPrefix, $percentComplete, $hours, $minutes, $seconds); | |
// rotate scroll ids (we should never use one twice) | |
$scrollId = $response['_scroll_id']; | |
echo $logPrefix . "Scroll ID: $scrollId\n"; | |
$firstDocument = true; | |
foreach($hits as $document) { | |
$docId = $document['_id']; | |
if($firstDocument) { | |
// this gives us a sampleing of document data | |
// it can be usefull for debuging | |
if(strlen($ransomNoteAttribute) > 0) { | |
// only print the ransom note if the user set up | |
// that attribute in the config section | |
$ransomNoteValue = $document['_source'][$ransomNoteAttribute]; | |
echo $logPrefix . "$docId.$ransomNoteAttribute = $ransomNoteValue\n"; | |
} | |
$firstDocument = false; | |
} | |
$insertString .= '{ "index": { "_index": "' . $dIndex . '", "_type": "' . $dType . '", "_id": "' . $docId . '"}}' . "\n"; | |
$insertString .= json_encode($document['_source']) . "\n"; | |
} | |
$success = false; | |
for($retry = 1; $retry <= 3; $retry++) { | |
///////////////////////////// | |
// send data to new server // | |
///////////////////////////// | |
echo $logPrefix . "Attempt $retry/3 to send docs to new server\n"; | |
$ch = curl_init("$dUrl/_bulk"); | |
curl_setopt($ch, CURLOPT_PORT, $dPort); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT'); | |
curl_setopt($ch, CURLOPT_USERPWD, "$dUser:$dPass"); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, $insertString); | |
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect | |
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout | |
$responseStr = curl_exec($ch); | |
if($errorNum = curl_errno($ch)) { | |
$errorMessage = curl_strerror($errorNum); | |
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n"; | |
continue; | |
} elseif(!$responseStr) { | |
echo $logPrefix . "NO RESPONSE!\n"; | |
continue; | |
} else { | |
// this is the good branch | |
$responseLen = strlen($responseStr); | |
echo $logPrefix . "Response $responseLen bytes\n"; | |
} | |
// parse response to an array | |
$response = json_decode($responseStr, true); | |
if($response === NULL) { // could not decode json | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo $logPrefix . "FAILED TO DECODE JSON (while sending documents)\n"; | |
continue; // retry | |
} elseif($response['errors']) { // failed inserts | |
// check all inserts and add the IDs of failed inserts to $problemIds[] | |
$problemIds = []; | |
foreach($response['items'] as $item) { | |
$insertResult = $item['index']['result']; | |
if($insertResult != 'created') { | |
$id = $item['index']['_id']; | |
echo $logPrefix . "INSERT $id FAILED ($insertResult) will retry\n"; | |
array_push($problemIds, $id); | |
} | |
} | |
echo $logPrefix . "Delaying before re-try\n"; | |
sleep(5); // give the cluster a second | |
foreach($hits as $document) { | |
$docId = $document['_id']; | |
if(in_array($docId, $problemIds)) { | |
echo $logPrefix . "Trying to re-insert $docId\n"; | |
// document needs to be re-inserted | |
$ch = curl_init("$dUrl/$dIndex/$dType/$docId"); | |
curl_setopt($ch, CURLOPT_PORT, $dPort); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT'); | |
curl_setopt($ch, CURLOPT_USERPWD, "$dUser:$dPass"); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($document['_source'])); | |
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect | |
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout | |
$responseStr = curl_exec($ch); | |
if($errorNum = curl_errno($ch)) { | |
$errorMessage = curl_strerror($errorNum); | |
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n"; | |
array_push($lostCauseIds, $docId); | |
} elseif(!$responseStr) { | |
echo $logPrefix . "NO RESPONSE!\n"; | |
array_push($lostCauseIds, $docId); | |
} else { | |
$responseLen = strlen($responseStr); | |
echo $logPrefix . "Response $responseLen bytes\n"; | |
} | |
$response = json_decode($responseStr, true); | |
if($response === NULL) { // could not decode json | |
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n"; | |
var_dump($responseStr); | |
echo $logPrefix . "FAILED TO DECODE JSON (while re-trying to send document)\n"; | |
array_push($lostCauseIds, $docId); | |
} elseif(!$response['created']) { // doc was still not created | |
// give up | |
array_push($lostCauseIds, $docId); | |
} else { | |
echo $logPrefix . "Re-try succeded!\n"; | |
} | |
} | |
} | |
} else { | |
echo $logPrefix . "No insert errors\n"; | |
} | |
$success = true; | |
break; // we continue earlier if there was an error | |
} | |
if(!$success) { | |
die($logPrefix . "Quiting after 3 attempts\n"); | |
} | |
} | |
if(count($lostCauseIds) > 0) { | |
echo $logPrefix . "=== Problem IDs ===\n"; | |
var_dump($lostCauseIds); | |
} else { | |
echo $logPrefix . "I think that worked\n"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment