Skip to content

Instantly share code, notes, and snippets.

@motin
Created April 20, 2015 10:49
Show Gist options
  • Save motin/ddb2a609a04f383ebf5d to your computer and use it in GitHub Desktop.
Save motin/ddb2a609a04f383ebf5d to your computer and use it in GitHub Desktop.
Old command from a project a couple of years ago to maintain elastic search indexes of contents stored in RDBMS tables using Yii 1
<?php
class ElasticSearchCommand extends AppConsoleCommand
{
public $model;
/**
* Fills ElasticSearch index from database contents
* @param <type> $modelRef
* @param <type> $pageSize
* @param <type> $currentPage
* @param <type> $orderDesc
* @param <type> $segment
*/
public function actionFillindex($modelRef, $suffix, $pageSize = 5, $currentPage = 1, $orderDesc = -1, $segment = false, $curl_debug = false)
{
if ($curl_debug) define("CURL_DEBUG", true);
// For a temporary workaround
//define('ONLY_USE_BASE_RELATIONS_FOR_TRIP_OBJECT', true);
// Pre-caution
$this->ensureSuffix($suffix);
// Use to activate Elastica debugging
//define('DEBUG', true);
$this->status("Loading controller and model");
$controller = $this->loadControllerByModelRef($modelRef);
$this->status("Applying pagination etc and getting the current records");
$criteria = $this->getCriteria($modelRef, $pageSize, $currentPage, $orderDesc, $segment);
// Debugging - including only certain records
//$criteria->condition = "geonameid = 4046255";
//$criteria->condition = "iso_alpha2 IN ('US','SE')";
$model = $modelRef::model();
$records = $model->getCommandBuilder()
->createFindCommand($model->tableSchema, $criteria)
->queryAll();
$index = $this->getDefaultIndex($modelRef, $suffix);
// Experience shows that elasticsearch doesn't copy well with more than around 5-10k documents in each bulk operation under load
$limit = 1000;
$start = 0;
$continue = false;
$counter = 1;
do {
// For debug
//$_GET['curl_debug'] = 1;
//$limit = 5;
$this->status("Converting at most $limit records to elastic search documents - #$counter");
$docs_by_type = array();
$recordsNow = array_slice($records, 0, $limit);
if (count($recordsNow) == 0)
break;
$records = array_slice($records, $limit);
if (count($records) > 0)
$continue = true;
foreach ($recordsNow as $k => $record)
{
$typeRef = null;
try {
$doc = $model->getElasticaDocument($record, $typeRef);
} catch (InvalidAccomplishmentFormatException $e) {
// ignore this document
$typeRef = "ignore";
} catch (WallPostFromNeitherTravelerNorLocal $e) {
// ignore this document
$typeRef = "ignore";
}
//var_dump($typeRef);
if ($typeRef != "ignore")
$docs_by_type[$typeRef][] = $doc;
}
$this->status("Found " . count($docs_by_type) . " different types of records");
foreach ($docs_by_type as $typeRef => $docs)
{
$type = $index->getType($typeRef);
try {
$this->status("Bulk-adding " . count($docs) . " documents of type $typeRef to the index");
$type->addDocuments($docs);
} catch (Elastica_Exception_BulkResponse $e) {
//var_dump($e->getResponse());
$this->status('Got below failures from the bulk adding process...');
var_dump($e->getFailures());
$this->status('Got above failures from the bulk adding process...');
}
// We could also catch Elastica_Exception_Client throwing curl errors in the form of Unknown error:52 :56 etc
}
$start += $limit;
$counter++;
} while ($continue);
//$resultSet = $type->search('rolf');
if (isset($pageSize))
{
$this->status("Done execution with pageSize $pageSize, current page $currentPage. Memory usage: " . round(memory_get_usage() / 1024 / 1024, 2) . " MiB");
echo "\n\n";
}
}
public function setMappings($modelRef, $suffix)
{
// Pre-caution
$this->ensureSuffix($suffix);
$index = Yiilastica::getIndexByModelRef($modelRef, $suffix);
Yiilastica::setIndexMappingsByModel($index, $modelRef::model());
}
public function ensureSuffix($suffix)
{
if (empty($suffix))
throw new Exception("All index-operations should be performed against a non-empty suffix.");
}
public function actionActivateSuffix($modelRef, $suffix, $curl_debug = false)
{
if ($curl_debug) define("CURL_DEBUG", true);
// Pre-caution
$this->ensureSuffix($suffix);
$index = Yiilastica::getIndexByModelRef($modelRef, $suffix);
$index->addAlias(Yiilastica::getIndexRefByModelRef($modelRef), $replace = true);
}
/**
* Deletes and re-creates an index
* Mappings needs to be defined directly after creation.
* After a mapping has been set in an index it cannot be altered.
* (According to well-renowned clintongormley@#elasticsearch around 2011-06-09)
*/
public function actionInitialize($modelRef, $suffix, $curl_debug = false)
{
if ($curl_debug) define("CURL_DEBUG", true);
// Pre-caution
$this->ensureSuffix($suffix);
$this->status("Creating empty index for $modelRef (suffix: '$suffix')");
$index = $this->getDefaultIndex($modelRef, $suffix);
//$index->delete(); // Using recreate=true instead
$index->create(array(), $recreate = true);
$this->status("Setting mappings for $modelRef (suffix: '$suffix')");
$this->setMappings($modelRef, $suffix);
$this->actionRefresh($modelRef, $suffix);
}
public function actionRefresh($modelRef, $suffix = '')
{
$index = $this->getDefaultIndex($modelRef, $suffix);
$this->status("Optimizing index");
$index->optimize();
$this->status("Refreshing index");
$index->refresh();
}
private function getDefaultIndex($modelRef, $suffix = '')
{
$this->status("Loading elastic search libs");
$client = Yiilastica::getClient();
$this->status("Loading index");
return Yiilastica::getIndexByModelRef($modelRef, $suffix);
}
private function getopt($args, &$modelRef, &$pageSize, &$currentPage, &$orderDesc, &$segment)
{
if (isset($args[0]))
$modelRef = $args[0];
if (empty($modelRef))
throw new Exception("Missing modelRef argument");
if (isset($args[1]))
$pageSize = intval($args[1]);
if (empty($pageSize))
$pageSize = 5;
if (isset($args[2]))
$currentPage = intval($args[2]);
if (empty($currentPage))
$currentPage = 1;
if (isset($args[3]))
$orderDesc = intval($args[3]);
if (!isset($orderDesc))
$orderDesc = -1;
if (isset($args[4]))
$segment = intval($args[4]);
if (!isset($segment))
$segment = false;
//$special = getopt("a:m:l:p:",array("action","model","limit","page"));
//var_dump($special, $args, $_SERVER['argv']);
//var_dump(compact("action", "modelRef", "pageSize", "currentPage", "orderDesc", "segment"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment