Skip to content

Instantly share code, notes, and snippets.

@peschlowp
Created June 12, 2014 08:58
Show Gist options
  • Save peschlowp/c9ce27bbaf7c3afe3e19 to your computer and use it in GitHub Desktop.
Save peschlowp/c9ce27bbaf7c3afe3e19 to your computer and use it in GitHub Desktop.
Aggregations example for calculations involving the duration of different document processing stages. Just run its main method with Elasticsearch 1.2 on the classpath.
package peschlowp.test.elasticsearch;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
public class Aggregations {
public static void main(String[] args) {
new Aggregations().run();
}
private static final String INDEX_NAME = "aggregations-test-index";
private static final String TYPE_NAME = "aggregations-test-type";
private static final long START_TIME = System.currentTimeMillis();
private static final int NUM_TEST_DOCS = 10;
private static final Random RNG = new Random(1234);
private static enum Stage {
STAGE_1("stage1", 100, 500),
STAGE_2("stage2", 1000, 3000),
STAGE_3("stage3", 5000, 6000);
private final String name;
private final int minDuration;
private final int maxDuration;
private Stage(String name, int minDuration, int maxDuration) {
this.name = name;
this.minDuration = minDuration;
this.maxDuration = maxDuration;
}
public String getName() {
return name;
}
public int getMinDuration() {
return minDuration;
}
public int getMaxDuration() {
return maxDuration;
}
}
private Node node;
private Client client;
private void init() {
node =
NodeBuilder.nodeBuilder().local(true)
.settings(ImmutableSettings.settingsBuilder().put("script.disable_dynamic", false).build()).node();
client = node.client();
client.admin().indices().prepareDelete(INDEX_NAME).execute().actionGet();
client
.admin()
.indices()
.prepareCreate(INDEX_NAME)
.setSettings(
ImmutableSettings.settingsBuilder().put("number_of_shards", 1).put("number_of_replicas", 0).build())
.execute().actionGet();
}
private void release() {
client.close();
node.close();
}
private void run() {
init();
insertDocuments();
aggregate();
release();
}
private void insertDocuments() {
System.out.println("Inserting " + NUM_TEST_DOCS + " documents");
for (int i = 0; i < NUM_TEST_DOCS; i++) {
try {
String id = String.valueOf(i);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("id", id);
long currentTimestamp = randomTimestampLastHour();
builder.field("start", new Date(currentTimestamp));
for (Stage stage : Stage.values()) {
long duration = stage.getMinDuration() + RNG.nextInt(stage.getMaxDuration() - stage.getMinDuration());
currentTimestamp += duration;
builder.field(stage.getName() + "_done", new Date(currentTimestamp));
}
builder.endObject();
System.out.println("Inserting document " + id);
add(id, builder);
} catch (Exception e) {
System.err.println("Exception during insert: " + e);
}
}
}
private void aggregate() {
AbstractAggregationBuilder averageDurationStage1 =
AggregationBuilders.avg("averageDurationStage1").script(stage1Duration());
AbstractAggregationBuilder averageDurationStage2 =
AggregationBuilders.avg("averageDurationStage2").script(stage2Duration());
AbstractAggregationBuilder averageDurationStage3 =
AggregationBuilders.avg("averageDurationStage3").script(stage3Duration());
AbstractAggregationBuilder shortestOverallDuration =
AggregationBuilders.min("shortestOverallDuration").script(totalDuration());
AbstractAggregationBuilder longestOverallDuration =
AggregationBuilders.max("longestOverallDuration").script(totalDuration());
AbstractAggregationBuilder totalProcessingTime =
AggregationBuilders.sum("totalProcessingTime").script(totalDuration());
SearchResponse response =
client.prepareSearch(INDEX_NAME).setTypes(TYPE_NAME).addAggregation(averageDurationStage1)
.addAggregation(averageDurationStage2).addAggregation(averageDurationStage3)
.addAggregation(shortestOverallDuration).addAggregation(longestOverallDuration)
.addAggregation(totalProcessingTime).execute().actionGet();
printResults(response);
}
private String stage1Duration() {
return "doc['stage1_done'].value - doc['start'].value";
}
private String stage2Duration() {
return "doc['stage2_done'].value - doc['stage1_done'].value";
}
private String stage3Duration() {
return "doc['stage3_done'].value - doc['stage2_done'].value";
}
private String totalDuration() {
return "doc['stage3_done'].value - doc['start'].value";
}
private void printResults(SearchResponse response) {
Map<String, Aggregation> results = response.getAggregations().asMap();
InternalAvg averageDurationStage1 = (InternalAvg) results.get("averageDurationStage1");
InternalAvg averageDurationStage2 = (InternalAvg) results.get("averageDurationStage2");
InternalAvg averageDurationStage3 = (InternalAvg) results.get("averageDurationStage3");
InternalMin shortestOverallDuration = (InternalMin) results.get("shortestOverallDuration");
InternalMax longestOverallDuration = (InternalMax) results.get("longestOverallDuration");
InternalSum totalProcessingTime = (InternalSum) results.get("totalProcessingTime");
System.out.println("average duration of stage1: " + averageDurationStage1.getValue());
System.out.println("average duration of stage2: " + averageDurationStage2.getValue());
System.out.println("average duration of stage3: " + averageDurationStage3.getValue());
System.out.println("shortest overall duration: " + shortestOverallDuration.getValue());
System.out.println("longest overall duration: " + longestOverallDuration.getValue());
System.out.println("total processing time: " + totalProcessingTime.getValue());
}
private long randomTimestampLastHour() {
return START_TIME - RNG.nextInt(3_600_000);
}
private void add(String id, XContentBuilder source) throws IOException {
client.prepareIndex(INDEX_NAME, TYPE_NAME, id).setSource(source).setRefresh(true).execute().actionGet();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment