Skip to content

Instantly share code, notes, and snippets.

@sanikolaev
Created July 12, 2022 14:55
Show Gist options
  • Save sanikolaev/13bf61bbe6c39350bded7c577216435f to your computer and use it in GitHub Desktop.
Save sanikolaev/13bf61bbe6c39350bded7c577216435f to your computer and use it in GitHub Desktop.
manticore and elasticsearch loading scripts with sharding
#!/usr/bin/php
<?php
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n");
require_once 'vendor/autoload.php';
$t = microtime(true);
$c = 0;
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$res = curl_exec($ch);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_POSTFIELDS, '{
"settings": {
"index": {
"number_of_shards": '.$argv[4].',
"refresh_interval": -1
}
},
"mappings": {
"properties": {
"name": {"type": "text"},
"email": {"type": "keyword"},
"description": {"type": "text"},
"age": {"type": "integer"},
"active": {"type": "integer"}
}
}
}');
$res = curl_exec($ch);
$faker = Faker\Factory::create();
$batches = [];
echo "preparing...\n";
$error = false;
$cache_file_name = '/tmp/json_user_es_'.$argv[1].'_'.$argv[3];
$c = 0;
if (!file_exists($cache_file_name)) {
$batches = [];
while ($c < $argv[3]) {
$ar = [$faker->name(), $faker->email(), $faker->text(), rand(10,90), rand(0,1)];
$batch[] = '{"index": {"_index":"user"}}'."\n".'{"name":"'.$ar[0].'","email":"'.$ar[1].'","description":"'.$ar[2].'","age":'.$ar[3].',"active":'.$ar[4].'}';
$c++;
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% ";
if (count($batch) == $argv[1]) {
$batches[] = implode("\n", $batch);
$batch = [];
}
}
if ($batch) $batches[] = implode("\n", $batch);
file_put_contents($cache_file_name, serialize($batches));
} else {
echo "found in cache\n";
$batches = unserialize(file_get_contents($cache_file_name));
}
$batchesMulti = [];
for ($n=0;$n<$argv[5];$n++) $batchesMulti = array_merge($batchesMulti, $batches);
$batches = $batchesMulti;
echo "querying...\n";
$t = microtime(true);
$mh = curl_multi_init();
$active = 0;
$c = 0;
while(true) {
if ($active < $argv[2] and count($batches) > 0) {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/_bulk");
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/x-ndjson']);
curl_setopt($ch, CURLOPT_POSTFIELDS, array_shift($batches)."\n");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $ch);
$status = curl_multi_exec($mh, $active);
}
$status = curl_multi_exec($mh, $active);
curl_multi_select($mh, 0.000001);
if ($active == 0 and count($batches) == 0) break;
}
echo "finished inserting\n";
echo "Total time: ".(microtime(true) - $t)."\n";
echo round($argv[3] * $argv[5] / (microtime(true) - $t))." docs per sec\n";
#!/usr/bin/php
<?php
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n");
require_once 'vendor/autoload.php';
// This function waits for an idle mysql connection for the $query, runs it and exits
function process($query, $shard) {
$query = str_replace('insert into user', 'insert into user'.$shard, $query);
global $all_links;
global $requests;
foreach ($all_links as $k=>$link) {
if (@$requests[$k]) continue;
mysqli_query($link, $query, MYSQLI_ASYNC);
@$requests[$k] = microtime(true);
return true;
}
do {
$links = $errors = $reject = array();
foreach ($all_links as $link) {
$links[] = $errors[] = $reject[] = $link;
}
$count = @mysqli_poll($links, $errors, $reject, 0, 1000);
if ($count > 0) {
foreach ($links as $j=>$link) {
$res = @mysqli_reap_async_query($links[$j]);
foreach ($all_links as $i=>$link_orig) if ($all_links[$i] === $links[$j]) break;
if ($link->error) {
echo "ERROR: {$link->error}\n";
if (!mysqli_ping($link)) {
echo "ERROR: mysql connection is down, removing it from the pool\n";
unset($all_links[$i]); // remove the original link from the pool
unset($requests[$i]); // and from the $requests too
}
return false;
}
if ($res === false and !$link->error) continue;
if (is_object($res)) {
mysqli_free_result($res);
}
$requests[$i] = microtime(true);
mysqli_query($link, $query, MYSQLI_ASYNC); // making next query
return true;
}
};
} while (true);
return true;
}
$all_links = [];
$requests = [];
$c = 0;
for ($i=0;$i<$argv[2];$i++) {
$m = @mysqli_connect('127.0.0.1', '', '', '', 9306);
if (mysqli_connect_error()) die("Cannot connect to Manticore\n");
$all_links[] = $m;
}
// init
$dist = [];
for ($n=1;$n<=$argv[4];$n++) {
mysqli_query($all_links[0], "drop table if exists user$n");
mysqli_query($all_links[0], "create table user$n(name text, email string, description text, age int, active int)");
$dist[] = "local='user$n'";
}
mysqli_query($all_links[0], "drop table if exists user");
mysqli_query($all_links[0], "create table user type='distributed' ".implode(' ', $dist));
$batch = [];
$query_start = "insert into user(id, name, email, description, age, active) values ";
$faker = Faker\Factory::create();
echo "preparing...\n";
$error = false;
$cache_file_name = '/tmp/'.md5($query_start).'_'.$argv[1].'_'.$argv[3];
$c = 0;
if (!file_exists($cache_file_name)) {
$batches = [];
while ($c < $argv[3]) {
$ar = [addslashes($faker->name()), addslashes($faker->email()), addslashes($faker->text()), rand(10,90), rand(0,1)];
$batch[] = "(0,'".$ar[0]."','".$ar[1]."','".$ar[2]."',".$ar[3].",".$ar[4].")";
$c++;
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% ";
if (count($batch) == $argv[1]) {
$batches[] = $query_start.implode(',', $batch);
$batch = [];
}
}
if ($batch) $batches[] = $query_start.implode(',', $batch);
file_put_contents($cache_file_name, serialize($batches));
} else {
echo "found in cache $cache_file_name\n";
$batches = unserialize(file_get_contents($cache_file_name));
}
echo "querying...\n";
$t = microtime(true);
for ($n=0;$n<$argv[5];$n++) {
foreach ($batches as $batch) {
if (!process($batch, rand(1, $argv[4]))) die("ERROR\n");
}
}
// wait until all the workers finish
do {
$links = $errors = $reject = array();
foreach ($all_links as $link) $links[] = $errors[] = $reject[] = $link;
$count = @mysqli_poll($links, $errors, $reject, 0, 100);
} while (count($all_links) != count($links) + count($errors) + count($reject));
echo "finished inserting\n";
echo "Total time: ".(microtime(true) - $t)."\n";
echo round($argv[3]*$argv[5] / (microtime(true) - $t))." docs per sec\n";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment