Skip to content

Instantly share code, notes, and snippets.

@9072997
Created December 8, 2017 03:54
Show Gist options
  • Save 9072997/e59e98ef0d72feadb025afdd05b14113 to your computer and use it in GitHub Desktop.
Save 9072997/e59e98ef0d72feadb025afdd05b14113 to your computer and use it in GitHub Desktop.
php script to re-indix an elastic search index (w/ only 1 type) and strip the id field (storeing in in _id)
{
"mappings": {
"my_new_type" : {
"properties" : {
"name" : {
"type" : "string",
"analyzer": "english"
},
"age" : {
"type" : "long"
},
"last_updated" : {
"type" : "long"
}
}
}
}
}
#!/usr/bin/env php
<?php
// public domain, requires php curl extensions
// if the source cluster has 5 primary shards
// setting this to 10 will result in 50 documents being moved
// for each batch
$docsPerShard = 250;
// source
$sUrl = 'https://foo.qb0x.com';
$sPort = '';
$sUser = '';
$sPass = '';
$sIndex = '';
$sType = '';
// dest (WILL DELETE YOUR EXISTING DEST INDEX)
$dUrl = 'https://bar.qb0x.com';
$dPort = '';
$dUser = '';
$dPass = '';
$dIndex = '';
$dType = '';
function echoOrTrace($stateInfo) {
if(is_null($stateInfo)) {
debug_print_backtrace();
} else {
echo "\n\n" . $stateInfo . "\n";
}
}
function httpRequest($method, $url, $port = 80, $body = NULL, $authString = NULL, $stateInfo = NULL) {
$ch = curl_init($url);
curl_setopt($ch, CURLOPT_PORT, $port);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
if(!is_null($authString)) curl_setopt($ch, CURLOPT_USERPWD, $authString);
if(!is_null($body)) curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 600); // 10 minutes to connect
curl_setopt($ch, CURLOPT_TIMEOUT, 600); // 10 minutes total timeout
$responseStr = curl_exec($ch);
if($errorNum = curl_errno($ch)) {
echoOrTrace($stateInfo);
$errorMessage = curl_strerror($errorNum);
die("cURL error ({$errno}):{$errorMessage}\n");
}
return $responseStr;
}
function searchUrlBuilder($url, $index, $type, $numDocs) {
$params = "q=*"; // match all documents
$params .= "&scroll=15m"; // push scroll context timeout back 15 minutes from now
$params .= "&search_type=scan"; // don't bother returning results in any order
$params .= "&size=$numDocs"; // because this is a scan search, this is per shard
return "$url/$index/$type/_search?$params";
}
function scrollUrlBuilder($url, $scrollId) {
return "$url/_search/scroll?scroll=15m&scroll_id=$scrollId";
}
// takes json returns assoc array
function parseResults($json, $stateInfo = NULL) {
// parse response to an array
$data = json_decode($json, true);
if($data === NULL) { // could not decode json
echo "\n\n=== var_dump(\$json) ===\n";
var_dump($json);
echoOrTrace($stateInfo);
die("FAILED TO DECODE JSON");
} elseif($data['_shards']['failed'] != 0) { // failed shards
$failedShardCount = $data['_shards']['failed'];
echo "\n\n=== var_dump(\$json) ===\n";
var_dump($json);
echo "\n\n=== var_dump(\$data) ===\n";
var_dump($data);
echoOrTrace($stateInfo);
die("$failedShardCount SHARDS FAILED\n");
} elseif($data['hits']['total'] == 0) { // no hits
echo "\n\n=== var_dump(\$json) ===\n";
var_dump($json);
echo "\n\n=== var_dump(\$data) ===\n";
var_dump($data);
echoOrTrace($stateInfo);
die("NO DOCUMENTS FOUND\n");
}
return $data;
}
function nextScrollId($resultArr, $stateInfo = NULL) {
if(!isset($resultArr['_scroll_id'])) { // no scroll id returned
echo "\n\n=== var_dump(\$resultArr) ===\n";
var_dump($$resultArr);
echoOrTrace($stateInfo);
die("NO NEW SCROLL ID FOUND\n");
}
return $resultArr['_scroll_id'];
}
function arrToBulkString($docArr, $newIndex, $newType) {
$docId = $docArr['_id'];
$str = '{ "index": { "_index": "' . $newIndex . '", "_type": "' . $newType . '", "_id": "' . $docId . '"}}' . "\n";
$str .= json_encode($docArr['_source']) . "\n";
return $str;
}
function dieOnBulkErrors($json, $stateInfo = NULL) {
$data = json_decode($json, true);
if($data === NULL) { // could not decode json
echo "\n\n=== var_dump(\$json) ===\n";
var_dump($json);
echoOrTrace($stateInfo);
die("FAILED TO DECODE JSON");
} elseif($data['errors']) { // some failed inserts
echo "\n\n=== var_dump(\$json) ===\n";
var_dump($json);
echo "\n\n=== var_dump(\$data) ===\n";
var_dump($data);
echoOrTrace($stateInfo);
die("ONE OR MORE INSERTS FAILED\n");
}
}
function sendBulk($insertString, $url, $port, $username, $password) {
$status = httpRequest('POST', "${url}/_bulk", $port, $insertString, "$username:$password");
dieOnBulkErrors($status);
}
// url is the origional url w/o the endpoint, etc
function getNextPage($url, $port, $scrollId, $user, $pass) {
$url = scrollUrlBuilder($url, $scrollId);
$result = httpRequest('GET', $url, $port, NULL, "$user:$pass");
return $result;
}
////////////
// setup //
////////////
// check if our dest index already exists
$indicies = httpRequest('GET', "$dUrl/_cat/indices?format=json", $dPort, NULL, "$dUser:$dPass");
$indicies = json_decode($indicies, true);
foreach($indicies as $index) {
if($index['index'] == $dIndex) {
httpRequest('DELETE', "$dUrl/$dIndex", $dPort, NULL, "$dUser:$dPass");
break;
}
}
// get mapping from file
if(count($argv) != 2 || $argv[1] == '--help') {
die("Usage: ${argv[0]} {<mappingFile> | --dynamic}\n" .
"Note: the mappings file must have the correct dest type name\n");
}
if($argv[1] != '--dynamic') {
// if we are not going to let elasticsearch generate mappings
// read the mapping from a file
if(!file_exists($argv[1])) {
die("File ${argv[1]} does not exist\n");
}
$mapping = file_get_contents($argv[1]);
// push mapping
httpRequest('PUT', "$dUrl/$dIndex", $dPort, $mapping, "$dUser:$dPass");
}
//////////////////////
// main code begins //
//////////////////////
$searchUrl = searchUrlBuilder($sUrl, $sIndex, $sType, $docsPerShard);
// won't return any documents, but will gove us a scroll id
$result = httpRequest('GET', $searchUrl, $sPort, NULL, "$sUser:$sPass");
// turns the results into an associtive array
$resultArr = parseResults($result);
$scrollId = nextScrollId($resultArr);
$totalDocsReceved = 0;
for(;/*ever*/;) { // or untill we die()
echo '.'; // progress bar of sorts
$result = getNextPage($sUrl, $sPort, $scrollId, $sUser, $sPass);
$resultArr = parseResults($result);
$scrollId = nextScrollId($resultArr);
$hits = $resultArr['hits']['hits'];
if(count($hits) == 0) {
echo "Moved $totalDocsReceved docs\n";
die("Looks like we are done\n");
}
$insertString = '';
foreach($hits as $hit) {
// this is custom error checking for this specific case
if(isset($hit['_source']['id'])) {
$esId = $hit['_id'];
$nonOfficialId = $hit['_source']['id'];
if($nonOfficialId != $esId) {
echo "WARN: Document id($nonOfficialId) does not match _id($esId)\n";
// prefer the unoficial id
$hit['_id'] = $hit['_source']['id'];
}
// strip the not-allowed id field
unset($hit['_source']['id']);
}
// end custom
$insertString .= arrToBulkString($hit, $dIndex, $dType);
$totalDocsReceved++;
if($totalDocsReceved % 1000 == 0) {
echo "Moved $totalDocsReceved docs\n";
}
}
sendBulk($insertString, $dUrl, $dPort, $dUser, $dPass);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment