Last active
November 15, 2015 23:19
-
-
Save areek/4fdd319b855be919e1ac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to Elasticsearch under one or more contributor | |
* license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright | |
* ownership. Elasticsearch licenses this file to you under | |
* the Apache License, Version 2.0 (the "License"); you may | |
* not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.elasticsearch.benchmark.search; | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; | |
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; | |
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; | |
import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.suggest.SuggestResponse; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.common.Priority; | |
import org.elasticsearch.common.StopWatch; | |
import org.elasticsearch.common.lucene.search.function.CombineFunction; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.unit.SizeValue; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.elasticsearch.discovery.Discovery; | |
import org.elasticsearch.index.IndexNotFoundException; | |
import org.elasticsearch.index.query.QueryBuilders; | |
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; | |
import org.elasticsearch.indices.IndexAlreadyExistsException; | |
import org.elasticsearch.node.Node; | |
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry.Option; | |
import org.elasticsearch.search.suggest.SuggestBuilder; | |
import org.elasticsearch.search.suggest.SuggestBuilders; | |
import org.elasticsearch.transport.TransportModule; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.util.*; | |
import java.util.concurrent.Callable; | |
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; | |
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; | |
import static org.elasticsearch.common.settings.Settings.settingsBuilder; | |
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; | |
import static org.elasticsearch.index.query.QueryBuilders.*; | |
import static org.elasticsearch.node.NodeBuilder.nodeBuilder; | |
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | |
/** | |
*/ | |
public class CompletionBenchMark { | |
private static final String INDEX_NAME = "index"; | |
private static final String TYPE_NAME = "type"; | |
private static Client client; | |
public static void main(String[] args) throws Exception { | |
Settings settings = settingsBuilder() | |
.put("index.refresh_interval", "-1") | |
.put(SETTING_NUMBER_OF_SHARDS, 1) | |
.put(SETTING_NUMBER_OF_REPLICAS, 0) | |
.put("path.home", "/tmp/") | |
.put(TransportModule.TRANSPORT_TYPE_KEY, "local") | |
.build(); | |
String clusterName = CompletionBenchMark.class.getSimpleName(); | |
Node node = nodeBuilder().clusterName(clusterName) | |
.settings(settingsBuilder().put(settings)) | |
.node(); | |
client = node.client(); | |
// Delete the index if already present | |
try { | |
client.admin().indices().prepareDelete(INDEX_NAME).get(); | |
} catch (IndexNotFoundException e) { | |
} | |
final Set<String> prefixesUpto1 = new HashSet<>(); | |
final Set<String> prefixesUpto2 = new HashSet<>(); | |
final Set<String> prefixesUpto3 = new HashSet<>(); | |
final Set<String> prefixesUpto4 = new HashSet<>(); | |
final Set<String> prefixesUpto5 = new HashSet<>(); | |
final Set<String> prefixesUpto6 = new HashSet<>(); | |
try { | |
XContentBuilder mapping = jsonBuilder().startObject() | |
.startObject(TYPE_NAME) | |
.startObject("properties") | |
.startObject("title_suggest") | |
.field("type", "completion") | |
.endObject() | |
.startObject("payload") | |
.field("type", "string") | |
.field("index", "not_analyzed") | |
.endObject() | |
.startObject("weight") | |
.field("type", "integer") | |
.endObject() | |
.endObject() | |
.endObject() | |
.endObject(); | |
client.admin().indices().prepareCreate(INDEX_NAME) | |
.addMapping(TYPE_NAME, mapping) | |
.get(); | |
// Index docs | |
BulkRequestBuilder builder = client.prepareBulk(); | |
final Path path = Paths.get("/Users/areek/workspace/allCountries.txt"); | |
try (BufferedReader reader = Files.newBufferedReader(path)) { | |
int c = 0; | |
String line; | |
Map<String, Object> source = new HashMap<>(1); | |
Map<String, Object> map = new HashMap<>(2); | |
while ((line = reader.readLine()) != null) { | |
String[] splits = line.split("\t"); | |
if (splits.length > 0) { | |
int id = Integer.parseInt(splits[0]); | |
String name = splits[1]; | |
String payload = splits[2]; | |
prefixesUpto1.add(name.substring(0, Math.min(name.length(), 1))); | |
prefixesUpto2.add(name.substring(0, Math.min(name.length(), 2))); | |
prefixesUpto3.add(name.substring(0, Math.min(name.length(), 3))); | |
prefixesUpto4.add(name.substring(0, Math.min(name.length(), 4))); | |
prefixesUpto5.add(name.substring(0, Math.min(name.length(), 5))); | |
prefixesUpto6.add(name.substring(0, Math.min(name.length(), 6))); | |
map.put("input", name); | |
map.put("weight", id); | |
source.put("title_suggest", map); | |
source.put("payload", payload); | |
source.put("weight", id); | |
builder.add(client.prepareIndex(INDEX_NAME, TYPE_NAME, String.valueOf(c++)).setSource(source)); | |
map.clear(); | |
source.clear(); | |
if (builder.numberOfActions() >= 1000) { | |
BulkResponse response = builder.get(); | |
if (response.hasFailures()) { | |
System.err.println("--> bulk failed"); | |
} | |
builder = client.prepareBulk(); | |
} | |
} | |
//if (c == 10000) {//5000000) { | |
// break; | |
//} | |
} | |
} | |
if (builder.numberOfActions() > 0) { | |
BulkResponse response = builder.get(); | |
if (response.hasFailures()) { | |
System.err.println("--> bulk failed"); | |
} | |
} | |
} catch (IndexAlreadyExistsException e) { | |
System.out.println("--> Index already exists, ignoring indexing phase, waiting for green"); | |
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); | |
if (clusterHealthResponse.isTimedOut()) { | |
System.err.println("--> Timed out waiting for cluster health"); | |
} | |
} | |
client.admin().indices().prepareForceMerge().setMaxNumSegments(1).get(); | |
client.admin().indices().prepareRefresh().execute().actionGet(); | |
System.out.println("num docs: " + client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet().getHits().totalHits()); | |
System.out.println("completion size: " + client.admin().indices().prepareStats(INDEX_NAME).setCompletionFields("title_suggest").get().getPrimaries().getCompletion().getSize()); | |
System.out.println("------------------------------------------------------------------------"); | |
System.out.println(String.format(Locale.ROOT, "%40s | %s | %10s | %s", "name", "# queries", "time(ms)", "KQPS")); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(1, 5, prefixesUpto1); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(2, 5, prefixesUpto2); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(3, 5, prefixesUpto3); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(4, 5, prefixesUpto4); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(5, 5, prefixesUpto5); | |
System.out.println("------------------------------------------------------------------------"); | |
runPerformanceTestsForPrefix(6, 5, prefixesUpto6); | |
System.out.println("------------------------------------------------------------------------"); | |
client.close(); | |
node.close(); | |
} | |
private static void runPerformanceTestsForPrefix(int maxLen, final int size, Collection<String> prefixes) { | |
BenchmarkResult result = runPerformanceTest(prefixes, new Bench() { | |
@Override | |
public int query(String prefix) { | |
return (int) client.prepareSearch(INDEX_NAME).setSize(size).setQuery( | |
functionScoreQuery(prefixQuery("title_suggest", prefix), ScoreFunctionBuilders.fieldValueFactorFunction("weight")).boostMode(CombineFunction.REPLACE)) | |
.get().getHits().getTotalHits(); | |
} | |
}); | |
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "prefix query (prefix length: " + maxLen + ")", | |
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg))); | |
result = runPerformanceTest(prefixes, new Bench() { | |
@Override | |
public int query(String prefix) { | |
return client.prepareSuggest(INDEX_NAME).addSuggestion( | |
SuggestBuilders.completionSuggestion("prefix").field("title_suggest").prefix(prefix).size(size)) | |
.get().getSuggest().getSuggestion("prefix").getEntries().get(0).getOptions().size(); | |
} | |
}); | |
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "completion (prefix length: " + maxLen + ")", | |
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg))); | |
result = runPerformanceTest(prefixes, new Bench() { | |
@Override | |
public int query(String prefix) { | |
return client.prepareSuggest(INDEX_NAME).addSuggestion( | |
SuggestBuilders.completionSuggestion("prefix").field("title_suggest").prefix(prefix).payload("payload").size(size)) | |
.get().getSuggest().getSuggestion("prefix").getEntries().get(0).getOptions().size(); | |
} | |
}); | |
System.out.println(String.format(Locale.ROOT, "%40s | %9d | %s +- %s | %s", "completion w/ payload (prefix length: " + maxLen + ")", | |
prefixes.size(), Math.ceil(result.avg), Math.ceil(result.stddev), Math.ceil(prefixes.size() / result.avg))); | |
} | |
interface Bench { | |
int query(String prefix); | |
} | |
private final static int rounds = 15; | |
private final static int warmup = 5; | |
/** Guard against opts. */ | |
@SuppressWarnings("unused") | |
private static volatile int guard; | |
private static BenchmarkResult runPerformanceTest(Collection<String> prefixes, Bench bench) { | |
final double NANOS_PER_MS = 1000000; | |
try { | |
List<Double> times = new ArrayList<>(); | |
for (int i = 0; i < warmup + rounds; i++) { | |
int v = 0; | |
final long start = System.nanoTime(); | |
for (String prefix : prefixes) { | |
v += bench.query(prefix); | |
} | |
guard = v; | |
times.add((System.nanoTime() - start) / NANOS_PER_MS); | |
} | |
return new BenchmarkResult(times, warmup); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private static class BenchmarkResult { | |
/** Average time per round (ms). */ | |
public final double avg; | |
/** | |
* Standard deviation (in milliseconds). | |
*/ | |
public final double stddev; | |
public BenchmarkResult(List<Double> times, int warmup) { | |
List<Double> values = times.subList(warmup, times.size()); | |
double sum = 0; | |
double sumSquares = 0; | |
for (double l : values) { | |
sum += l; | |
sumSquares += l * l; | |
} | |
this.avg = sum / (double) values.size(); | |
this.stddev = Math.sqrt(sumSquares / (double) values.size() - this.avg * this.avg); | |
} | |
@Override | |
public String toString() { | |
return String.format(Locale.ROOT, "%.0f [+- %.2f]", | |
avg, stddev); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment