Skip to content

Instantly share code, notes, and snippets.

@9072997
Last active February 8, 2018 15:23
Show Gist options
  • Save 9072997/6aebb6ee85c4be131aadea6842659d47 to your computer and use it in GitHub Desktop.
Save 9072997/6aebb6ee85c4be131aadea6842659d47 to your computer and use it in GitHub Desktop.
Elastissearch reindex-er script in php7
#!/usr/bin/php
<?php
// public domain, requires php curl extensions
// Note: this script can be called with a comma seperated list of
// shards from the source cluster (ex 1,3,4) and it will only pull
// data from those shards. This can allow you to run this script in
// parallell like this:
// For 5 shards, all primary, in 5 threads
// echo $logPrefix . "0 1 2 3 4" | xargs -n 1 -P 100 -- ./reindex.php
// if the source cluster has 5 primary shards
// setting this to 10 will result in 50 documents being moved
// for each batch
$docsPerShard = 250;
// this is a debuging tool. Just pick a document attribute that
// is not optional
$ransomNoteAttribute = '';
// source
$sUrl = 'https://foo.qb0x.com';
$sPort = '';
$sUser = '';
$sPass = '';
$sIndex = '';
$sType = '';
// dest
$dUrl = 'https://bar.qb0x.com';
$dPort = '';
$dUser = '';
$dPass = '';
$dIndex = '';
$dType = '';
// want to play around with Elasticsearch with no setup?
// wondering what to put in those 'dest' fields?
// checkout Qbox.io for hosted Elasticsearch solutions
////////////////
// END CONFIG //
////////////////
// this should be re-factored into functions
// also error handleing should be cleaner
if(count($argv) >= 2) {
if(!preg_match('/^[0-9]+(?:,[0-9]+)*$/', $argv[1]) || count($argv) >= 3) {
die("USAGE: php ${argv[0]} [shard-number[,shard-number[...]]]\n" .
" edit this file to set server url/port/user/password\n");
} else { // if we got here, it meens the regex matches
$limitToShards = $argv[1];
$logPrefix = $limitToShards . ":\t";
echo $logPrefix . "Running on shards $limitToShards\n";
}
} else { // no arguments
$logPrefix = ''; // if we are running in parallel this shows which thread we are
echo "Running on all avalible shards\n";
}
$scrollId = '';
$requestNum = 0;
$lostCauseIds = [];
for(;;) { // we break out
$requestNum++;
$insertString = '';
echo $logPrefix . "Request $requestNum\n";
$success = false; // ugly retry setup
for($retry = 1; $retry <= 3; $retry++) {
//////////////////////////////////////////
// get some docs from the source server //
//////////////////////////////////////////
echo $logPrefix . "Getting Data (attempt $retry/3)\n";
// $scrollId will be blank for 1st search
if($scrollId == '') {
$params = "q=*"; // match all documents
$params .= "&scroll=15m"; // push scroll context timeout back 15 minutes from now
$params .= "&search_type=scan"; // don't bother returning results in any order
$params .= "&size=$docsPerShard"; // because this is a scan search, this is per shard
if(isset($limitToShards)) {
$params .= "&preference=_shards:$limitToShards";
}
$url = "$sUrl/$sIndex/$sType/_search?$params";
$urlWithPort = "$sUrl:$sPort/$sIndex/$sType/_search?$params";
} else {
$url = "$sUrl/_search/scroll?scroll=15m&scroll_id=$scrollId";
$urlWithPort = "$sUrl:$sPort/_search/scroll?scroll=15m&scroll_id=$scrollId";
}
echo $logPrefix . "URL=$urlWithPort\n";
$ch = curl_init($url);
curl_setopt($ch, CURLOPT_PORT, $sPort);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'GET');
curl_setopt($ch, CURLOPT_USERPWD, "$sUser:$sPass");
curl_setopt($ch, CURLOPT_POSTFIELDS, $insertString);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout
$responseStr = curl_exec($ch);
if($errorNum = curl_errno($ch)) {
$errorMessage = curl_strerror($errorNum);
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n";
continue;
} elseif(!$responseStr) {
echo $logPrefix . "NO RESPONSE!\n";
continue;
} else {
$responseLen = strlen($responseStr);
echo $logPrefix . "Response $responseLen bytes\n";
}
$success = true;
break; // if we got here, we don't need to retry
}
if(!$success) {
die($logPrefix . "Quiting after 3 attempts\n");
}
// parse response to an array
$response = json_decode($responseStr, true);
if($response === NULL) { // could not decode json
// I don't know if we can recover, so die in a usefull way
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
die($logPrefix . "FAILED TO DECODE JSON\n");
} elseif($response['_shards']['failed'] != 0) { // failed shards
$failedShardCount = $response['_shards']['failed'];
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n";
var_dump($response);
die($logPrefix . "$failedShardCount SHARDS FAILED\n");
} elseif($response['hits']['total'] == 0) { // no hits
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n";
var_dump($response);
die($logPrefix . "NO DOCUMENTS FOUND\n");
} elseif(!isset($response['_scroll_id'])) { // no scroll id returned
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n";
var_dump($response);
die($logPrefix . "NO NEW SCROLL ID FOUND\n");
} else {
echo $logPrefix . "Decoded Json\n";
}
$totalHits = $response['hits']['total'];
$hits = $response['hits']['hits'];
$numDocumentsInResponse = count($hits);
echo $logPrefix . "Receved $numDocumentsInResponse/$totalHits documents\n";
if($numDocumentsInResponse == 0) {
if($requestNum == 1) {
// first request will not return any documents
$scrollId = $response['_scroll_id'];
echo $logPrefix . "No documents receved (normal for 1st request)\n";
continue;
} else { // we should have gotten documents, but we didn't
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo "\n\n${logPrefix}=== var_dump(\$response) ===\n";
var_dump($response);
echo $logPrefix . "NO DOCUMENTS RETURNED\n";
break; // assume it's then end of the search
}
}
// simple "progress" calculation
$predictedRequiredRequests = ceil($totalHits / $numDocumentsInResponse);
$percentComplete = intdiv(100 * $requestNum, $predictedRequiredRequests);
$now = microtime(true);
if(isset($prevTimestamp)) {
$batchTime = $now - $prevTimestamp;
if(isset($avgBatchTime)) {
// info becomes less relevent as it becomes older
$avgBatchTime = $avgBatchTime*0.9 + $batchTime*0.1;
} else { // don't do any fancy averageing
$avgBatchTime = $batchTime;
}
}
$prevTimestamp = $now;
$remainingSeconds = intval(($predictedRequiredRequests - $requestNum)
* (isset($avgBatchTime)?$avgBatchTime:0));
$hours = intdiv($remainingSeconds, 60*60);
$minutes = intdiv($remainingSeconds % (60*60), 60);
$seconds = $remainingSeconds % 60;
echo $logPrefix . "Request $requestNum/$predictedRequiredRequests\n";
printf("%s%d%% complete, %d:%02d:%02d Remaining\n", $logPrefix, $percentComplete, $hours, $minutes, $seconds);
// rotate scroll ids (we should never use one twice)
$scrollId = $response['_scroll_id'];
echo $logPrefix . "Scroll ID: $scrollId\n";
$firstDocument = true;
foreach($hits as $document) {
$docId = $document['_id'];
if($firstDocument) {
// this gives us a sampleing of document data
// it can be usefull for debuging
if(strlen($ransomNoteAttribute) > 0) {
// only print the ransom note if the user set up
// that attribute in the config section
$ransomNoteValue = $document['_source'][$ransomNoteAttribute];
echo $logPrefix . "$docId.$ransomNoteAttribute = $ransomNoteValue\n";
}
$firstDocument = false;
}
$insertString .= '{ "index": { "_index": "' . $dIndex . '", "_type": "' . $dType . '", "_id": "' . $docId . '"}}' . "\n";
$insertString .= json_encode($document['_source']) . "\n";
}
$success = false;
for($retry = 1; $retry <= 3; $retry++) {
/////////////////////////////
// send data to new server //
/////////////////////////////
echo $logPrefix . "Attempt $retry/3 to send docs to new server\n";
$ch = curl_init("$dUrl/_bulk");
curl_setopt($ch, CURLOPT_PORT, $dPort);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT');
curl_setopt($ch, CURLOPT_USERPWD, "$dUser:$dPass");
curl_setopt($ch, CURLOPT_POSTFIELDS, $insertString);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout
$responseStr = curl_exec($ch);
if($errorNum = curl_errno($ch)) {
$errorMessage = curl_strerror($errorNum);
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n";
continue;
} elseif(!$responseStr) {
echo $logPrefix . "NO RESPONSE!\n";
continue;
} else {
// this is the good branch
$responseLen = strlen($responseStr);
echo $logPrefix . "Response $responseLen bytes\n";
}
// parse response to an array
$response = json_decode($responseStr, true);
if($response === NULL) { // could not decode json
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo $logPrefix . "FAILED TO DECODE JSON (while sending documents)\n";
continue; // retry
} elseif($response['errors']) { // failed inserts
// check all inserts and add the IDs of failed inserts to $problemIds[]
$problemIds = [];
foreach($response['items'] as $item) {
$insertResult = $item['index']['result'];
if($insertResult != 'created') {
$id = $item['index']['_id'];
echo $logPrefix . "INSERT $id FAILED ($insertResult) will retry\n";
array_push($problemIds, $id);
}
}
echo $logPrefix . "Delaying before re-try\n";
sleep(5); // give the cluster a second
foreach($hits as $document) {
$docId = $document['_id'];
if(in_array($docId, $problemIds)) {
echo $logPrefix . "Trying to re-insert $docId\n";
// document needs to be re-inserted
$ch = curl_init("$dUrl/$dIndex/$dType/$docId");
curl_setopt($ch, CURLOPT_PORT, $dPort);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT');
curl_setopt($ch, CURLOPT_USERPWD, "$dUser:$dPass");
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($document['_source']));
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout
$responseStr = curl_exec($ch);
if($errorNum = curl_errno($ch)) {
$errorMessage = curl_strerror($errorNum);
echo $logPrefix . "cURL error ({$errno}):{$errorMessage}\n";
array_push($lostCauseIds, $docId);
} elseif(!$responseStr) {
echo $logPrefix . "NO RESPONSE!\n";
array_push($lostCauseIds, $docId);
} else {
$responseLen = strlen($responseStr);
echo $logPrefix . "Response $responseLen bytes\n";
}
$response = json_decode($responseStr, true);
if($response === NULL) { // could not decode json
echo "\n\n${logPrefix}=== var_dump(\$responseStr) ===\n";
var_dump($responseStr);
echo $logPrefix . "FAILED TO DECODE JSON (while re-trying to send document)\n";
array_push($lostCauseIds, $docId);
} elseif(!$response['created']) { // doc was still not created
// give up
array_push($lostCauseIds, $docId);
} else {
echo $logPrefix . "Re-try succeded!\n";
}
}
}
} else {
echo $logPrefix . "No insert errors\n";
}
$success = true;
break; // we continue earlier if there was an error
}
if(!$success) {
die($logPrefix . "Quiting after 3 attempts\n");
}
}
if(count($lostCauseIds) > 0) {
echo $logPrefix . "=== Problem IDs ===\n";
var_dump($lostCauseIds);
} else {
echo $logPrefix . "I think that worked\n";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment