Created
April 20, 2015 10:49
-
-
Save motin/ddb2a609a04f383ebf5d to your computer and use it in GitHub Desktop.
Old command from a project a couple of years ago to maintain elastic search indexes of contents stored in RDBMS tables using Yii 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class ElasticSearchCommand extends AppConsoleCommand | |
{ | |
public $model; | |
/** | |
* Fills ElasticSearch index from database contents | |
* @param <type> $modelRef | |
* @param <type> $pageSize | |
* @param <type> $currentPage | |
* @param <type> $orderDesc | |
* @param <type> $segment | |
*/ | |
public function actionFillindex($modelRef, $suffix, $pageSize = 5, $currentPage = 1, $orderDesc = -1, $segment = false, $curl_debug = false) | |
{ | |
if ($curl_debug) define("CURL_DEBUG", true); | |
// For a temporary workaround | |
//define('ONLY_USE_BASE_RELATIONS_FOR_TRIP_OBJECT', true); | |
// Pre-caution | |
$this->ensureSuffix($suffix); | |
// Use to activate Elastica debugging | |
//define('DEBUG', true); | |
$this->status("Loading controller and model"); | |
$controller = $this->loadControllerByModelRef($modelRef); | |
$this->status("Applying pagination etc and getting the current records"); | |
$criteria = $this->getCriteria($modelRef, $pageSize, $currentPage, $orderDesc, $segment); | |
// Debugging - including only certain records | |
//$criteria->condition = "geonameid = 4046255"; | |
//$criteria->condition = "iso_alpha2 IN ('US','SE')"; | |
$model = $modelRef::model(); | |
$records = $model->getCommandBuilder() | |
->createFindCommand($model->tableSchema, $criteria) | |
->queryAll(); | |
$index = $this->getDefaultIndex($modelRef, $suffix); | |
// Experience shows that elasticsearch doesn't copy well with more than around 5-10k documents in each bulk operation under load | |
$limit = 1000; | |
$start = 0; | |
$continue = false; | |
$counter = 1; | |
do { | |
// For debug | |
//$_GET['curl_debug'] = 1; | |
//$limit = 5; | |
$this->status("Converting at most $limit records to elastic search documents - #$counter"); | |
$docs_by_type = array(); | |
$recordsNow = array_slice($records, 0, $limit); | |
if (count($recordsNow) == 0) | |
break; | |
$records = array_slice($records, $limit); | |
if (count($records) > 0) | |
$continue = true; | |
foreach ($recordsNow as $k => $record) | |
{ | |
$typeRef = null; | |
try { | |
$doc = $model->getElasticaDocument($record, $typeRef); | |
} catch (InvalidAccomplishmentFormatException $e) { | |
// ignore this document | |
$typeRef = "ignore"; | |
} catch (WallPostFromNeitherTravelerNorLocal $e) { | |
// ignore this document | |
$typeRef = "ignore"; | |
} | |
//var_dump($typeRef); | |
if ($typeRef != "ignore") | |
$docs_by_type[$typeRef][] = $doc; | |
} | |
$this->status("Found " . count($docs_by_type) . " different types of records"); | |
foreach ($docs_by_type as $typeRef => $docs) | |
{ | |
$type = $index->getType($typeRef); | |
try { | |
$this->status("Bulk-adding " . count($docs) . " documents of type $typeRef to the index"); | |
$type->addDocuments($docs); | |
} catch (Elastica_Exception_BulkResponse $e) { | |
//var_dump($e->getResponse()); | |
$this->status('Got below failures from the bulk adding process...'); | |
var_dump($e->getFailures()); | |
$this->status('Got above failures from the bulk adding process...'); | |
} | |
// We could also catch Elastica_Exception_Client throwing curl errors in the form of Unknown error:52 :56 etc | |
} | |
$start += $limit; | |
$counter++; | |
} while ($continue); | |
//$resultSet = $type->search('rolf'); | |
if (isset($pageSize)) | |
{ | |
$this->status("Done execution with pageSize $pageSize, current page $currentPage. Memory usage: " . round(memory_get_usage() / 1024 / 1024, 2) . " MiB"); | |
echo "\n\n"; | |
} | |
} | |
public function setMappings($modelRef, $suffix) | |
{ | |
// Pre-caution | |
$this->ensureSuffix($suffix); | |
$index = Yiilastica::getIndexByModelRef($modelRef, $suffix); | |
Yiilastica::setIndexMappingsByModel($index, $modelRef::model()); | |
} | |
public function ensureSuffix($suffix) | |
{ | |
if (empty($suffix)) | |
throw new Exception("All index-operations should be performed against a non-empty suffix."); | |
} | |
public function actionActivateSuffix($modelRef, $suffix, $curl_debug = false) | |
{ | |
if ($curl_debug) define("CURL_DEBUG", true); | |
// Pre-caution | |
$this->ensureSuffix($suffix); | |
$index = Yiilastica::getIndexByModelRef($modelRef, $suffix); | |
$index->addAlias(Yiilastica::getIndexRefByModelRef($modelRef), $replace = true); | |
} | |
/** | |
* Deletes and re-creates an index | |
* Mappings needs to be defined directly after creation. | |
* After a mapping has been set in an index it cannot be altered. | |
* (According to well-renowned clintongormley@#elasticsearch around 2011-06-09) | |
*/ | |
public function actionInitialize($modelRef, $suffix, $curl_debug = false) | |
{ | |
if ($curl_debug) define("CURL_DEBUG", true); | |
// Pre-caution | |
$this->ensureSuffix($suffix); | |
$this->status("Creating empty index for $modelRef (suffix: '$suffix')"); | |
$index = $this->getDefaultIndex($modelRef, $suffix); | |
//$index->delete(); // Using recreate=true instead | |
$index->create(array(), $recreate = true); | |
$this->status("Setting mappings for $modelRef (suffix: '$suffix')"); | |
$this->setMappings($modelRef, $suffix); | |
$this->actionRefresh($modelRef, $suffix); | |
} | |
public function actionRefresh($modelRef, $suffix = '') | |
{ | |
$index = $this->getDefaultIndex($modelRef, $suffix); | |
$this->status("Optimizing index"); | |
$index->optimize(); | |
$this->status("Refreshing index"); | |
$index->refresh(); | |
} | |
private function getDefaultIndex($modelRef, $suffix = '') | |
{ | |
$this->status("Loading elastic search libs"); | |
$client = Yiilastica::getClient(); | |
$this->status("Loading index"); | |
return Yiilastica::getIndexByModelRef($modelRef, $suffix); | |
} | |
private function getopt($args, &$modelRef, &$pageSize, &$currentPage, &$orderDesc, &$segment) | |
{ | |
if (isset($args[0])) | |
$modelRef = $args[0]; | |
if (empty($modelRef)) | |
throw new Exception("Missing modelRef argument"); | |
if (isset($args[1])) | |
$pageSize = intval($args[1]); | |
if (empty($pageSize)) | |
$pageSize = 5; | |
if (isset($args[2])) | |
$currentPage = intval($args[2]); | |
if (empty($currentPage)) | |
$currentPage = 1; | |
if (isset($args[3])) | |
$orderDesc = intval($args[3]); | |
if (!isset($orderDesc)) | |
$orderDesc = -1; | |
if (isset($args[4])) | |
$segment = intval($args[4]); | |
if (!isset($segment)) | |
$segment = false; | |
//$special = getopt("a:m:l:p:",array("action","model","limit","page")); | |
//var_dump($special, $args, $_SERVER['argv']); | |
//var_dump(compact("action", "modelRef", "pageSize", "currentPage", "orderDesc", "segment")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment