Skip to content

Instantly share code, notes, and snippets.

@areek
Last active November 15, 2015 23:19
Show Gist options
  • Save areek/4fdd319b855be919e1ac to your computer and use it in GitHub Desktop.
Save areek/4fdd319b855be919e1ac to your computer and use it in GitHub Desktop.
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.search;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.suggest.SuggestResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry.Option;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.transport.TransportModule;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.Callable;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
/**
*/
public class CompletionBenchMark {
private static final String INDEX_NAME = "index";
private static final String TYPE_NAME = "type";
private static Client client;
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("index.refresh_interval", "-1")
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put("path.home", "/tmp/")
.put(TransportModule.TRANSPORT_TYPE_KEY, "local")
.build();
String clusterName = CompletionBenchMark.class.getSimpleName();
Node node = nodeBuilder().clusterName(clusterName)
.settings(settingsBuilder().put(settings))
.node();
client = node.client();
// Delete the index if already present
try {
client.admin().indices().prepareDelete(INDEX_NAME).get();
} catch (IndexNotFoundException e) {
}
final Set<String> prefixesUpto1 = new HashSet<>();
final Set<String> prefixesUpto2 = new HashSet<>();
final Set<String> prefixesUpto3 = new HashSet<>();
final Set<String> prefixesUpto4 = new HashSet<>();
final Set<String> prefixesUpto5 = new HashSet<>();
final Set<String> prefixesUpto6 = new HashSet<>();
try {
XContentBuilder mapping = jsonBuilder().startObject()
.startObject(TYPE_NAME)
.startObject("properties")
.startObject("title_suggest")
.field("type", "completion")
.endObject()
.startObject("payload")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("weight")
.field("type", "integer")
.endObject()
.endObject()
.endObject()
.endObject();
client.admin().indices().prepareCreate(INDEX_NAME)
.addMapping(TYPE_NAME, mapping)
.get();
// Index docs
BulkRequestBuilder builder = client.prepareBulk();
final Path path = Paths.get("/Users/areek/workspace/allCountries.txt");
try (BufferedReader reader = Files.newBufferedReader(path)) {
int c = 0;
String line;
Map<String, Object> source = new HashMap<>(1);
Map<String, Object> map = new HashMap<>(2);
while ((line = reader.readLine()) != null) {
String[] splits = line.split("\t");
if (splits.length > 0) {
int id = Integer.parseInt(splits[0]);
String name = splits[1];
String payload = splits[2];
prefixesUpto1.add(name.substring(0, Math.min(name.length(), 1)));
prefixesUpto2.add(name.substring(0, Math.min(name.length(), 2)));
prefixesUpto3.add(name.substring(0, Math.min(name.length(), 3)));
prefixesUpto4.add(name.substring(0, Math.min(name.length(), 4)));
prefixesUpto5.add(name.substring(0, Math.min(name.length(), 5)));
prefixesUpto6.add(name.substring(0, Math.min(name.length(), 6)));
map.put("input", name);
map.put("weight", id);
source.put("title_suggest", map);
source.put("payload", payload);
source.put("weight", id);
builder.add(client.prepareIndex(INDEX_NAME, TYPE_NAME, String.valueOf(c++)).setSource(source));
map.clear();
source.clear();
if (builder.numberOfActions() >= 1000) {
BulkResponse response = builder.get();
if (response.hasFailures()) {
System.err.println("--> bulk failed");
}
builder = client.prepareBulk();
}
}
//if (c == 10000) {//5000000) {
// break;
//}
}
}
if (builder.numberOfActions() > 0) {
BulkResponse response = builder.get();
if (response.hasFailures()) {
System.err.println("--> bulk failed");
}
}
} catch (IndexAlreadyExistsException e) {
System.out.println("--> Index already exists, ignoring indexing phase, waiting for green");
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
if (clusterHealthResponse.isTimedOut()) {
System.err.println("--> Timed out waiting for cluster health");
}
}
client.admin().indices().prepareForceMerge().setMaxNumSegments(1).get();
client.admin().indices().prepareRefresh().execute().actionGet();
System.out.println("num docs: " + client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet().getHits().totalHits());
System.out.println("completion size: " + client.admin().indices().prepareStats(INDEX_NAME).setCompletionFields("title_suggest").get().getPrimaries().getCompletion().getSize());
System.out.println("------------------------------------------------------------------------");
System.out.println(String.format(Locale.ROOT, "%40s | %s | %10s | %s", "name", "# queries", "time(ms)", "KQPS"));
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(1, 5, prefixesUpto1);
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(2, 5, prefixesUpto2);
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(3, 5, prefixesUpto3);
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(4, 5, prefixesUpto4);
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(5, 5, prefixesUpto5);
System.out.println("------------------------------------------------------------------------");
runPerformanceTestsForPrefix(6, 5, prefixesUpto6);
System.out.println("------------------------------------------------------------------------");
client.close();
node.close();
}
private static void runPerformanceTestsForPrefix(int maxLen, final int size, Collection<String> prefixes) {
BenchmarkResult result = runPerformanceTest(prefixes, new Bench() {
@Override
public int query(String prefix) {
return (int) client.prepareSearch(INDEX_NAME).setSize(size).setQuery(
functionScoreQuery(prefixQuery("title_suggest", prefix), ScoreFunctionBuilders.fieldValueFactorFunction("weight")).boostMode(CombineFunction.REPLACE))
.get().getHits().getTotalHits();
}
});
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "prefix query (prefix length: " + maxLen + ")",
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg)));
result = runPerformanceTest(prefixes, new Bench() {
@Override
public int query(String prefix) {
return client.prepareSuggest(INDEX_NAME).addSuggestion(
SuggestBuilders.completionSuggestion("prefix").field("title_suggest").prefix(prefix).size(size))
.get().getSuggest().getSuggestion("prefix").getEntries().get(0).getOptions().size();
}
});
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "completion (prefix length: " + maxLen + ")",
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg)));
result = runPerformanceTest(prefixes, new Bench() {
@Override
public int query(String prefix) {
return client.prepareSuggest(INDEX_NAME).addSuggestion(
SuggestBuilders.completionSuggestion("prefix").field("title_suggest").prefix(prefix).payload("payload").size(size))
.get().getSuggest().getSuggestion("prefix").getEntries().get(0).getOptions().size();
}
});
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "completion w/ payload (prefix length: " + maxLen + ")",
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg)));
}
interface Bench {
int query(String prefix);
}
private final static int rounds = 15;
private final static int warmup = 5;
/** Guard against opts. */
@SuppressWarnings("unused")
private static volatile int guard;
private static BenchmarkResult runPerformanceTest(Collection<String> prefixes, Bench bench) {
final double NANOS_PER_MS = 1000000;
try {
List<Double> times = new ArrayList<>();
for (int i = 0; i < warmup + rounds; i++) {
int v = 0;
final long start = System.nanoTime();
for (String prefix : prefixes) {
v += bench.query(prefix);
}
guard = v;
times.add((System.nanoTime() - start) / NANOS_PER_MS);
}
return new BenchmarkResult(times, warmup);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static class BenchmarkResult {
/** Average time per round (ms). */
public final double avg;
/**
* Standard deviation (in milliseconds).
*/
public final double stddev;
public BenchmarkResult(List<Double> times, int warmup) {
List<Double> values = times.subList(warmup, times.size());
double sum = 0;
double sumSquares = 0;
for (double l : values) {
sum += l;
sumSquares += l * l;
}
this.avg = sum / (double) values.size();
this.stddev = Math.sqrt(sumSquares / (double) values.size() - this.avg * this.avg);
}
@Override
public String toString() {
return String.format(Locale.ROOT, "%.0f [+- %.2f]",
avg, stddev);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment