Skip to content

Instantly share code, notes, and snippets.

@sanikolaev
Last active July 12, 2022 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sanikolaev/67a92bedb3a93f736ecd7ec68c07de88 to your computer and use it in GitHub Desktop.
Save sanikolaev/67a92bedb3a93f736ecd7ec68c07de88 to your computer and use it in GitHub Desktop.
scripts to load data ito manticore and elasticsearch with given concurrency and batch size
#!/usr/bin/php
<?php
if (count($argv) < 5) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <multiplier>\n");
require_once 'vendor/autoload.php';
$t = microtime(true);
$c = 0;
# delete old index
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$res = curl_exec($ch);
# create new index
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_POSTFIELDS, '{
"mappings": {
"properties": {
"name": {"type": "text"},
"email": {"type": "keyword"},
"description": {"type": "text"},
"age": {"type": "integer"},
"active": {"type": "integer"}
}
}
}');
$res = curl_exec($ch);
$faker = Faker\Factory::create();
$batches = [];
echo "preparing...\n";
$error = false;
$cache_file_name = '/tmp/json_user_es_'.$argv[1].'_'.$argv[3];
$c = 0;
if (!file_exists($cache_file_name)) {
$batches = [];
while ($c < $argv[3]) {
$ar = [$faker->name(), $faker->email(), $faker->text(), rand(10,90), rand(0,1)];
$batch[] = '{"index": {"_index":"user"}}'."\n".'{"name":"'.$ar[0].'","email":"'.$ar[1].'","description":"'.$ar[2].'","age":'.$ar[3].',"active":'.$ar[4].'}';
$c++;
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% ";
if (count($batch) == $argv[1]) {
$batches[] = implode("\n", $batch);
$batch = [];
}
}
if ($batch) $batches[] = implode("\n", $batch);
file_put_contents($cache_file_name, serialize($batches));
} else {
echo "found in cache\n";
$batches = unserialize(file_get_contents($cache_file_name));
}
$batchesMulti = [];
for ($n=0;$n<$argv[4];$n++) $batchesMulti = array_merge($batchesMulti, $batches);
$batches = $batchesMulti;
echo "querying...\n";
$t = microtime(true);
$mh = curl_multi_init();
$active = 0;
$c = 0;
while(true) {
if ($active < $argv[2] and count($batches) > 0) {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/_bulk?refresh=true");
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/x-ndjson']);
curl_setopt($ch, CURLOPT_POSTFIELDS, array_shift($batches)."\n");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $ch);
$status = curl_multi_exec($mh, $active);
}
$status = curl_multi_exec($mh, $active);
curl_multi_select($mh, 0.000001);
if ($active == 0 and count($batches) == 0) break;
}
echo "finished inserting\n";
echo "Total time: ".(microtime(true) - $t)."\n";
echo round($argv[3] * $argv[4] / (microtime(true) - $t))." docs per sec\n";
#!/usr/bin/php
<?php
if (count($argv) < 5) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <multiplier>\n");
require_once 'vendor/autoload.php';
$t = microtime(true);
$c = 0;
# delete old index
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9308/sql?mode=raw");
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, 'query='.urlencode('drop table if exists user'));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$res = curl_exec($ch);
# create new index
curl_setopt($ch, CURLOPT_POSTFIELDS, 'query='.urlencode('create table user(name text, email string, description text, age int, active int)'));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$res = curl_exec($ch);
$faker = Faker\Factory::create();
$batches = [];
echo "preparing...\n";
$error = false;
$cache_file_name = '/tmp/json_user_'.$argv[1].'_'.$argv[3];
$c = 0;
if (!file_exists($cache_file_name)) {
$batches = [];
while ($c < $argv[3]) {
$ar = [$faker->name(), $faker->email(), $faker->text(), rand(10,90), rand(0,1)];
$batch[] = '{"insert": {"index": "user", "doc": {"name":"'.$ar[0].'","email":"'.$ar[1].'","description":"'.$ar[2].'","age":'.$ar[3].',"active":'.$ar[4].'}}}';
$c++;
if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."% ";
if (count($batch) == $argv[1]) {
$batches[] = implode("\n", $batch);
$batch = [];
}
}
if ($batch) $batches[] = implode("\n", $batch);
file_put_contents($cache_file_name, serialize($batches));
} else {
echo "found in cache\n";
$batches = unserialize(file_get_contents($cache_file_name));
}
$batchesMulti = [];
for ($n=0;$n<$argv[4];$n++) $batchesMulti = array_merge($batchesMulti, $batches);
$batches = $batchesMulti;
echo "querying...\n";
$t = microtime(true);
$mh = curl_multi_init();
$active = 0;
$c = 0;
while(true) {
if ($active < $argv[2] and count($batches) > 0) {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9308/bulk");
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/x-ndjson']);
curl_setopt($ch, CURLOPT_POSTFIELDS, array_shift($batches));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $ch);
$status = curl_multi_exec($mh, $active);
}
$status = curl_multi_exec($mh, $active);
curl_multi_select($mh, 0.000001);
if ($active == 0 and count($batches) == 0) break;
}
echo "finished inserting\n";
echo "Total time: ".(microtime(true) - $t)."\n";
echo round($argv[3] * $argv[4] / (microtime(true) - $t))." docs per sec\n";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment