Created
December 8, 2017 03:54
-
-
Save 9072997/e59e98ef0d72feadb025afdd05b14113 to your computer and use it in GitHub Desktop.
php script to re-indix an elastic search index (w/ only 1 type) and strip the id field (storeing in in _id)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"mappings": { | |
"my_new_type" : { | |
"properties" : { | |
"name" : { | |
"type" : "string", | |
"analyzer": "english" | |
}, | |
"age" : { | |
"type" : "long" | |
}, | |
"last_updated" : { | |
"type" : "long" | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env php | |
<?php | |
// public domain, requires php curl extensions | |
// if the source cluster has 5 primary shards | |
// setting this to 10 will result in 50 documents being moved | |
// for each batch | |
$docsPerShard = 250; | |
// source | |
$sUrl = 'https://foo.qb0x.com'; | |
$sPort = ''; | |
$sUser = ''; | |
$sPass = ''; | |
$sIndex = ''; | |
$sType = ''; | |
// dest (WILL DELETE YOUR EXISTING DEST INDEX) | |
$dUrl = 'https://bar.qb0x.com'; | |
$dPort = ''; | |
$dUser = ''; | |
$dPass = ''; | |
$dIndex = ''; | |
$dType = ''; | |
function echoOrTrace($stateInfo) { | |
if(is_null($stateInfo)) { | |
debug_print_backtrace(); | |
} else { | |
echo "\n\n" . $stateInfo . "\n"; | |
} | |
} | |
function httpRequest($method, $url, $port = 80, $body = NULL, $authString = NULL, $stateInfo = NULL) { | |
$ch = curl_init($url); | |
curl_setopt($ch, CURLOPT_PORT, $port); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); | |
if(!is_null($authString)) curl_setopt($ch, CURLOPT_USERPWD, $authString); | |
if(!is_null($body)) curl_setopt($ch, CURLOPT_POSTFIELDS, $body); | |
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect | |
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout | |
$responseStr = curl_exec($ch); | |
if($errorNum = curl_errno($ch)) { | |
echoOrTrace($stateInfo); | |
$errorMessage = curl_strerror($errorNum); | |
die("cURL error ({$errno}):{$errorMessage}\n"); | |
} | |
return $responseStr; | |
} | |
function searchUrlBuilder($url, $index, $type, $numDocs) { | |
$params = "q=*"; // match all documents | |
$params .= "&scroll=15m"; // push scroll context timeout back 15 minutes from now | |
$params .= "&search_type=scan"; // don't bother returning results in any order | |
$params .= "&size=$numDocs"; // because this is a scan search, this is per shard | |
return "$url/$index/$type/_search?$params"; | |
} | |
function scrollUrlBuilder($url, $scrollId) { | |
return "$url/_search/scroll?scroll=15m&scroll_id=$scrollId"; | |
} | |
// takes json returns assoc array | |
function parseResults($json, $stateInfo = NULL) { | |
// parse response to an array | |
$data = json_decode($json, true); | |
if($data === NULL) { // could not decode json | |
echo "\n\n=== var_dump(\$json) ===\n"; | |
var_dump($json); | |
echoOrTrace($stateInfo); | |
die("FAILED TO DECODE JSON"); | |
} elseif($data['_shards']['failed'] != 0) { // failed shards | |
$failedShardCount = $data['_shards']['failed']; | |
echo "\n\n=== var_dump(\$json) ===\n"; | |
var_dump($json); | |
echo "\n\n=== var_dump(\$data) ===\n"; | |
var_dump($data); | |
echoOrTrace($stateInfo); | |
die("$failedShardCount SHARDS FAILED\n"); | |
} elseif($data['hits']['total'] == 0) { // no hits | |
echo "\n\n=== var_dump(\$json) ===\n"; | |
var_dump($json); | |
echo "\n\n=== var_dump(\$data) ===\n"; | |
var_dump($data); | |
echoOrTrace($stateInfo); | |
die("NO DOCUMENTS FOUND\n"); | |
} | |
return $data; | |
} | |
function nextScrollId($resultArr, $stateInfo = NULL) { | |
if(!isset($resultArr['_scroll_id'])) { // no scroll id returned | |
echo "\n\n=== var_dump(\$resultArr) ===\n"; | |
var_dump($$resultArr); | |
echoOrTrace($stateInfo); | |
die("NO NEW SCROLL ID FOUND\n"); | |
} | |
return $resultArr['_scroll_id']; | |
} | |
function arrToBulkString($docArr, $newIndex, $newType) { | |
$docId = $docArr['_id']; | |
$str = '{ "index": { "_index": "' . $newIndex . '", "_type": "' . $newType . '", "_id": "' . $docId . '"}}' . "\n"; | |
$str .= json_encode($docArr['_source']) . "\n"; | |
return $str; | |
} | |
function dieOnBulkErrors($json, $stateInfo = NULL) { | |
$data = json_decode($json, true); | |
if($data === NULL) { // could not decode json | |
echo "\n\n=== var_dump(\$json) ===\n"; | |
var_dump($json); | |
echoOrTrace($stateInfo); | |
die("FAILED TO DECODE JSON"); | |
} elseif($data['errors']) { // some failed inserts | |
echo "\n\n=== var_dump(\$json) ===\n"; | |
var_dump($json); | |
echo "\n\n=== var_dump(\$data) ===\n"; | |
var_dump($data); | |
echoOrTrace($stateInfo); | |
die("ONE OR MORE INSERTS FAILED\n"); | |
} | |
} | |
function sendBulk($insertString, $url, $port, $username, $password) { | |
$status = httpRequest('POST', "${url}/_bulk", $port, $insertString, "$username:$password"); | |
dieOnBulkErrors($status); | |
} | |
// url is the origional url w/o the endpoint, etc | |
function getNextPage($url, $port, $scrollId, $user, $pass) { | |
$url = scrollUrlBuilder($url, $scrollId); | |
$result = httpRequest('GET', $url, $port, NULL, "$user:$pass"); | |
return $result; | |
} | |
//////////// | |
// setup // | |
//////////// | |
// check if our dest index already exists | |
$indicies = httpRequest('GET', "$dUrl/_cat/indices?format=json", $dPort, NULL, "$dUser:$dPass"); | |
$indicies = json_decode($indicies, true); | |
foreach($indicies as $index) { | |
if($index['index'] == $dIndex) { | |
httpRequest('DELETE', "$dUrl/$dIndex", $dPort, NULL, "$dUser:$dPass"); | |
break; | |
} | |
} | |
// get mapping from file | |
if(count($argv) != 2 || $argv[1] == '--help') { | |
die("Usage: ${argv[0]} {<mappingFile> | --dynamic}\n" . | |
"Note: the mappings file must have the correct dest type name\n"); | |
} | |
if($argv[1] != '--dynamic') { | |
// if we are not going to let elasticsearch generate mappings | |
// read the mapping from a file | |
if(!file_exists($argv[1])) { | |
die("File ${argv[1]} does not exist\n"); | |
} | |
$mapping = file_get_contents($argv[1]); | |
// push mapping | |
httpRequest('PUT', "$dUrl/$dIndex", $dPort, $mapping, "$dUser:$dPass"); | |
} | |
////////////////////// | |
// main code begins // | |
////////////////////// | |
$searchUrl = searchUrlBuilder($sUrl, $sIndex, $sType, $docsPerShard); | |
// won't return any documents, but will gove us a scroll id | |
$result = httpRequest('GET', $searchUrl, $sPort, NULL, "$sUser:$sPass"); | |
// turns the results into an associtive array | |
$resultArr = parseResults($result); | |
$scrollId = nextScrollId($resultArr); | |
$totalDocsReceved = 0; | |
for(;/*ever*/;) { // or untill we die() | |
echo '.'; // progress bar of sorts | |
$result = getNextPage($sUrl, $sPort, $scrollId, $sUser, $sPass); | |
$resultArr = parseResults($result); | |
$scrollId = nextScrollId($resultArr); | |
$hits = $resultArr['hits']['hits']; | |
if(count($hits) == 0) { | |
echo "Moved $totalDocsReceved docs\n"; | |
die("Looks like we are done\n"); | |
} | |
$insertString = ''; | |
foreach($hits as $hit) { | |
// this is custom error checking for this specific case | |
if(isset($hit['_source']['id'])) { | |
$esId = $hit['_id']; | |
$nonOfficialId = $hit['_source']['id']; | |
if($nonOfficialId != $esId) { | |
echo "WARN: Document id($nonOfficialId) does not match _id($esId)\n"; | |
// prefer the unoficial id | |
$hit['_id'] = $hit['_source']['id']; | |
} | |
// strip the not-allowed id field | |
unset($hit['_source']['id']); | |
} | |
// end custom | |
$insertString .= arrToBulkString($hit, $dIndex, $dType); | |
$totalDocsReceved++; | |
if($totalDocsReceved % 1000 == 0) { | |
echo "Moved $totalDocsReceved docs\n"; | |
} | |
} | |
sendBulk($insertString, $dUrl, $dPort, $dUser, $dPass); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment