Skip to content

Instantly share code, notes, and snippets.

@s1monw
Created October 15, 2013 20:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s1monw/6998524 to your computer and use it in GitHub Desktop.
Save s1monw/6998524 to your computer and use it in GitHub Desktop.
diff --git a/src/main/java/org/elasticsearch/action/terms/ShardTermsByQueryRequest.java b/src/main/java/org/elasticsearch/action/terms/ShardTermsByQueryRequest.java
index 3a16c84..7f31877 100644
--- a/src/main/java/org/elasticsearch/action/terms/ShardTermsByQueryRequest.java
+++ b/src/main/java/org/elasticsearch/action/terms/ShardTermsByQueryRequest.java
@@ -150,7 +150,7 @@ class ShardTermsByQueryRequest extends BroadcastShardOperationRequest {
} else {
out.writeVInt(0);
}
- out.writeString(field);
+ out.writeString(field); // THIS IS NULL!!
out.writeVLong(nowInMillis);
}
}
diff --git a/src/main/java/org/elasticsearch/action/terms/TermsByQueryResponse.java b/src/main/java/org/elasticsearch/action/terms/TermsByQueryResponse.java
index 15b353e..b5f2570 100644
--- a/src/main/java/org/elasticsearch/action/terms/TermsByQueryResponse.java
+++ b/src/main/java/org/elasticsearch/action/terms/TermsByQueryResponse.java
@@ -51,12 +51,17 @@ public class TermsByQueryResponse extends BroadcastOperationResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
- responseTerms = ResponseTerms.readIn(in);
+ if (in.readBoolean()) {
+ responseTerms = ResponseTerms.readIn(in);
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
- responseTerms.writeTo(out);
+ out.writeBoolean(responseTerms != null);
+ if (responseTerms != null) {
+ responseTerms.writeTo(out); // this can be null for some reason
+ }
}
}
diff --git a/src/main/java/org/elasticsearch/action/terms/TransportTermsByQueryAction.java b/src/main/java/org/elasticsearch/action/terms/TransportTermsByQueryAction.java
index 0c57d17..8c53fa8 100644
--- a/src/main/java/org/elasticsearch/action/terms/TransportTermsByQueryAction.java
+++ b/src/main/java/org/elasticsearch/action/terms/TransportTermsByQueryAction.java
@@ -42,6 +42,7 @@ import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.engine.Engine.Searcher;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.query.ParsedQuery;
@@ -178,23 +179,22 @@ public class TransportTermsByQueryAction
protected ShardTermsByQueryResponse shardOperation(ShardTermsByQueryRequest request) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
-
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
- SearchContext context = new DefaultSearchContext(0,
- new ShardSearchRequest().types(request.types())
- .filteringAliases(request.filteringAliases())
- .nowInMillis(request.nowInMillis()),
- shardTarget, indexShard.acquireSearcher("termsByQuery"), indexService, indexShard,
+ ShardSearchRequest shardSearchReq = new ShardSearchRequest().types(request.types())
+ .filteringAliases(request.filteringAliases())
+ .nowInMillis(request.nowInMillis());
+ Searcher acquireSearcher = indexShard.acquireSearcher("termsByQuery");
+ SearchContext context = new DefaultSearchContext(0, shardSearchReq,
+ shardTarget, acquireSearcher, indexService, indexShard,
scriptService, cacheRecycler);
- SearchContext.setCurrent(context);
-
- FieldMapper fieldMapper = context.smartNameFieldMapper(request.field());
- if (fieldMapper == null) {
- throw new SearchContextException(context, "field not found");
- }
-
- IndexFieldData indexFieldData = context.fieldData().getForField(fieldMapper);
try {
+ SearchContext.setCurrent(context);
+ FieldMapper fieldMapper = context.smartNameFieldMapper(request.field());
+ if (fieldMapper == null) {
+ throw new SearchContextException(context, "field not found");
+ }
+
+ IndexFieldData indexFieldData = context.fieldData().getForField(fieldMapper);
// TODO: min score should move to be "null" as a value that is not initialized...
if (request.minScore() != -1) {
context.minimumScore(request.minScore());
@@ -228,28 +228,27 @@ public class TransportTermsByQueryAction
}
context.preProcess();
- try {
- // execute the search only gathering the hit count and bitsets for each segment
- HitSetCollector termCollector = new HitSetCollector(context.searcher().getTopReaderContext().leaves().size());
- Query query = context.query();
- if (!(query instanceof ConstantScoreQuery)) {
- query = new ConstantScoreQuery(query);
- }
- context.searcher().search(query, termCollector);
-
- // gather the terms reading the values from the field data cache
- // the number of terms will be less than or equal to the total hits from the collector
- ResponseTerms responseTerms = ResponseTerms.get(termCollector, indexFieldData);
- responseTerms.process(context.searcher().getTopReaderContext().leaves());
- return new ShardTermsByQueryResponse(request.index(), request.shardId(), responseTerms);
- } catch (Exception e) {
- throw new QueryPhaseExecutionException(context, "failed to execute count", e);
+ // execute the search only gathering the hit count and bitsets for each segment
+ HitSetCollector termCollector = new HitSetCollector(context.searcher().getTopReaderContext().leaves().size());
+ Query query = context.query();
+ if (!(query instanceof ConstantScoreQuery)) {
+ query = new ConstantScoreQuery(query);
}
+ context.searcher().search(query, termCollector);
+
+ // gather the terms reading the values from the field data cache
+ // the number of terms will be less than or equal to the total hits from the collector
+ ResponseTerms responseTerms = ResponseTerms.get(termCollector, indexFieldData);
+ responseTerms.process(context.searcher().getTopReaderContext().leaves());
+ return new ShardTermsByQueryResponse(request.index(), request.shardId(), responseTerms);
+ } catch (Throwable e) {
+ logger.info("failed!!!!", e);
+ throw new QueryPhaseExecutionException(context, "failed to execute count", e);
} finally {
// this will also release the index searcher
context.release();
SearchContext.removeCurrent();
- }
+ }
}
/*
diff --git a/src/test/java/org/elasticsearch/test/integration/action/terms/SimpleTermsByQueryActionTests.java b/src/test/java/org/elasticsearch/test/integration/action/terms/SimpleTermsByQueryActionTests.java
index a82f7da..d8e1acd 100644
--- a/src/test/java/org/elasticsearch/test/integration/action/terms/SimpleTermsByQueryActionTests.java
+++ b/src/test/java/org/elasticsearch/test/integration/action/terms/SimpleTermsByQueryActionTests.java
@@ -23,10 +23,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.terms.ResponseTerms;
import org.elasticsearch.action.terms.TermsByQueryRequestBuilder;
import org.elasticsearch.action.terms.TermsByQueryResponse;
-import org.elasticsearch.common.Priority;
import org.elasticsearch.test.AbstractIntegrationTest;
-import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
-import org.elasticsearch.test.AbstractIntegrationTest.Scope;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -35,7 +33,6 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-@ClusterScope(scope = Scope.TEST, numNodes = 1)
public class SimpleTermsByQueryActionTests extends AbstractIntegrationTest {
protected int getNumShards() {
@@ -52,8 +49,7 @@ public class SimpleTermsByQueryActionTests extends AbstractIntegrationTest {
@Test
public void testPreJoin() throws Exception {
client().admin().indices().prepareCreate("idx").setSettings("index.number_of_shards", getNumShards()).execute().actionGet();
- client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
-
+ ensureGreen();
for (int i = 0; i < numberOfDocs(); i++) {
client().prepareIndex("idx", "type", "" + i)
.setSource(jsonBuilder().startObject().field("tag", "tag" + i).field("num", i).endObject()).execute().actionGet();
@@ -63,7 +59,7 @@ public class SimpleTermsByQueryActionTests extends AbstractIntegrationTest {
TermsByQueryResponse resp = new TermsByQueryRequestBuilder(client()).setIndices("idx").setField("tag").setQuery(matchAllQuery())
.execute().actionGet();
-
+ ElasticsearchAssertions.assertNoFailures(resp);
assertThat(resp.getResponseTerms(), notNullValue());
assertThat(resp.getResponseTerms().size(), is(numberOfDocs()));
for (int i = 0; i < numberOfDocs(); i++) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment