#!/usr/bin/php
<?php
if (count($argv) < 5) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <multiplier>\n");

require_once 'vendor/autoload.php';

$t = microtime(true);
$c = 0;

# delete old index
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$res = curl_exec($ch);

# create new index
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/user");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_POSTFIELDS, '{
  "mappings": {
    "properties": {
      "name": {"type": "text"},
      "email": {"type": "keyword"},
      "description": {"type": "text"},
      "age": {"type": "integer"},
      "active": {"type": "integer"}
    }
  }
}');
$res = curl_exec($ch);

$faker = Faker\Factory::create();

$batches = [];
echo "preparing...\n";
$error = false;
$cache_file_name = '/tmp/json_user_es_'.$argv[1].'_'.$argv[3];
$c = 0;
if (!file_exists($cache_file_name)) {
    $batches = [];
    while ($c < $argv[3]) {
      $ar = [$faker->name(), $faker->email(), $faker->text(), rand(10,90), rand(0,1)];
      $batch[] = '{"index": {"_index":"user"}}'."\n".'{"name":"'.$ar[0].'","email":"'.$ar[1].'","description":"'.$ar[2].'","age":'.$ar[3].',"active":'.$ar[4].'}';
      $c++;
      if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."%       ";
        if (count($batch) == $argv[1]) {
          $batches[] = implode("\n", $batch);
          $batch = [];
        }
    }
    if ($batch) $batches[] = implode("\n", $batch);
    file_put_contents($cache_file_name, serialize($batches));
} else {
    echo "found in cache\n";
    $batches = unserialize(file_get_contents($cache_file_name));
}

$batchesMulti = [];
for ($n=0;$n<$argv[4];$n++) $batchesMulti = array_merge($batchesMulti, $batches);
$batches = $batchesMulti;

echo "querying...\n";
$t = microtime(true);

$mh = curl_multi_init();
$active = 0;
$c = 0;

while(true) {
  if ($active < $argv[2] and count($batches) > 0) {
    $ch = curl_init();
    curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9200/_bulk?refresh=true");
    curl_setopt($ch, CURLOPT_POST, 1);
    curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/x-ndjson']);
    curl_setopt($ch, CURLOPT_POSTFIELDS, array_shift($batches)."\n");
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_multi_add_handle($mh, $ch);
    $status = curl_multi_exec($mh, $active);
  }
  $status = curl_multi_exec($mh, $active);
  curl_multi_select($mh, 0.000001);
  if ($active == 0 and count($batches) == 0) break;
}
echo "finished inserting\n";
echo "Total time: ".(microtime(true) - $t)."\n";
echo round($argv[3] * $argv[4] / (microtime(true) - $t))." docs per sec\n";