Created
July 12, 2022 14:55
-
-
Save sanikolaev/13bf61bbe6c39350bded7c577216435f to your computer and use it in GitHub Desktop.
manticore and elasticsearch loading scripts with sharding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/php | |
<?php | |
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n"); | |
require_once 'vendor/autoload.php'; | |
$t = microtime(true); | |
$c = 0; | |
$ch = curl_init(); | |
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user"); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE"); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
$res = curl_exec($ch); | |
$ch = curl_init(); | |
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user"); | |
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT"); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, '{ | |
"settings": { | |
"index": { | |
"number_of_shards": '.$argv[4].', | |
"refresh_interval": -1 | |
} | |
}, | |
"mappings": { | |
"properties": { | |
"name": {"type": "text"}, | |
"email": {"type": "keyword"}, | |
"description": {"type": "text"}, | |
"age": {"type": "integer"}, | |
"active": {"type": "integer"} | |
} | |
} | |
}'); | |
$res = curl_exec($ch); | |
$faker = Faker\Factory::create(); | |
$batches = []; | |
echo "preparing...\n"; | |
$error = false; | |
$cache_file_name = '/tmp/json_user_es_'.$argv[1].'_'.$argv[3]; | |
$c = 0; | |
if (!file_exists($cache_file_name)) { | |
$batches = []; | |
while ($c < $argv[3]) { | |
$ar = [$faker->name(), $faker->email(), $faker->text(), rand(10,90), rand(0,1)]; | |
$batch[] = '{"index": {"_index":"user"}}'."\n".'{"name":"'.$ar[0].'","email":"'.$ar[1].'","description":"'.$ar[2].'","age":'.$ar[3].',"active":'.$ar[4].'}'; | |
$c++; | |
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% "; | |
if (count($batch) == $argv[1]) { | |
$batches[] = implode("\n", $batch); | |
$batch = []; | |
} | |
} | |
if ($batch) $batches[] = implode("\n", $batch); | |
file_put_contents($cache_file_name, serialize($batches)); | |
} else { | |
echo "found in cache\n"; | |
$batches = unserialize(file_get_contents($cache_file_name)); | |
} | |
$batchesMulti = []; | |
for ($n=0;$n<$argv[5];$n++) $batchesMulti = array_merge($batchesMulti, $batches); | |
$batches = $batchesMulti; | |
echo "querying...\n"; | |
$t = microtime(true); | |
$mh = curl_multi_init(); | |
$active = 0; | |
$c = 0; | |
while(true) { | |
if ($active < $argv[2] and count($batches) > 0) { | |
$ch = curl_init(); | |
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/_bulk"); | |
curl_setopt($ch, CURLOPT_POST, 1); | |
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/x-ndjson']); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, array_shift($batches)."\n"); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_multi_add_handle($mh, $ch); | |
$status = curl_multi_exec($mh, $active); | |
} | |
$status = curl_multi_exec($mh, $active); | |
curl_multi_select($mh, 0.000001); | |
if ($active == 0 and count($batches) == 0) break; | |
} | |
echo "finished inserting\n"; | |
echo "Total time: ".(microtime(true) - $t)."\n"; | |
echo round($argv[3] * $argv[5] / (microtime(true) - $t))." docs per sec\n"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/php | |
<?php | |
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n"); | |
require_once 'vendor/autoload.php'; | |
// This function waits for an idle mysql connection for the $query, runs it and exits | |
function process($query, $shard) { | |
$query = str_replace('insert into user', 'insert into user'.$shard, $query); | |
global $all_links; | |
global $requests; | |
foreach ($all_links as $k=>$link) { | |
if (@$requests[$k]) continue; | |
mysqli_query($link, $query, MYSQLI_ASYNC); | |
@$requests[$k] = microtime(true); | |
return true; | |
} | |
do { | |
$links = $errors = $reject = array(); | |
foreach ($all_links as $link) { | |
$links[] = $errors[] = $reject[] = $link; | |
} | |
$count = @mysqli_poll($links, $errors, $reject, 0, 1000); | |
if ($count > 0) { | |
foreach ($links as $j=>$link) { | |
$res = @mysqli_reap_async_query($links[$j]); | |
foreach ($all_links as $i=>$link_orig) if ($all_links[$i] === $links[$j]) break; | |
if ($link->error) { | |
echo "ERROR: {$link->error}\n"; | |
if (!mysqli_ping($link)) { | |
echo "ERROR: mysql connection is down, removing it from the pool\n"; | |
unset($all_links[$i]); // remove the original link from the pool | |
unset($requests[$i]); // and from the $requests too | |
} | |
return false; | |
} | |
if ($res === false and !$link->error) continue; | |
if (is_object($res)) { | |
mysqli_free_result($res); | |
} | |
$requests[$i] = microtime(true); | |
mysqli_query($link, $query, MYSQLI_ASYNC); // making next query | |
return true; | |
} | |
}; | |
} while (true); | |
return true; | |
} | |
$all_links = []; | |
$requests = []; | |
$c = 0; | |
for ($i=0;$i<$argv[2];$i++) { | |
$m = @mysqli_connect('127.0.0.1', '', '', '', 9306); | |
if (mysqli_connect_error()) die("Cannot connect to Manticore\n"); | |
$all_links[] = $m; | |
} | |
// init | |
$dist = []; | |
for ($n=1;$n<=$argv[4];$n++) { | |
mysqli_query($all_links[0], "drop table if exists user$n"); | |
mysqli_query($all_links[0], "create table user$n(name text, email string, description text, age int, active int)"); | |
$dist[] = "local='user$n'"; | |
} | |
mysqli_query($all_links[0], "drop table if exists user"); | |
mysqli_query($all_links[0], "create table user type='distributed' ".implode(' ', $dist)); | |
$batch = []; | |
$query_start = "insert into user(id, name, email, description, age, active) values "; | |
$faker = Faker\Factory::create(); | |
echo "preparing...\n"; | |
$error = false; | |
$cache_file_name = '/tmp/'.md5($query_start).'_'.$argv[1].'_'.$argv[3]; | |
$c = 0; | |
if (!file_exists($cache_file_name)) { | |
$batches = []; | |
while ($c < $argv[3]) { | |
$ar = [addslashes($faker->name()), addslashes($faker->email()), addslashes($faker->text()), rand(10,90), rand(0,1)]; | |
$batch[] = "(0,'".$ar[0]."','".$ar[1]."','".$ar[2]."',".$ar[3].",".$ar[4].")"; | |
$c++; | |
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% "; | |
if (count($batch) == $argv[1]) { | |
$batches[] = $query_start.implode(',', $batch); | |
$batch = []; | |
} | |
} | |
if ($batch) $batches[] = $query_start.implode(',', $batch); | |
file_put_contents($cache_file_name, serialize($batches)); | |
} else { | |
echo "found in cache $cache_file_name\n"; | |
$batches = unserialize(file_get_contents($cache_file_name)); | |
} | |
echo "querying...\n"; | |
$t = microtime(true); | |
for ($n=0;$n<$argv[5];$n++) { | |
foreach ($batches as $batch) { | |
if (!process($batch, rand(1, $argv[4]))) die("ERROR\n"); | |
} | |
} | |
// wait until all the workers finish | |
do { | |
$links = $errors = $reject = array(); | |
foreach ($all_links as $link) $links[] = $errors[] = $reject[] = $link; | |
$count = @mysqli_poll($links, $errors, $reject, 0, 100); | |
} while (count($all_links) != count($links) + count($errors) + count($reject)); | |
echo "finished inserting\n"; | |
echo "Total time: ".(microtime(true) - $t)."\n"; | |
echo round($argv[3]*$argv[5] / (microtime(true) - $t))." docs per sec\n"; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment