Created
May 20, 2022 09:46
-
-
Save ywelsch/75511f79665edaff349eb4d363d21200 to your computer and use it in GitHub Desktop.
testNoPartialSearchesDuringRollingRestart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | |
* or more contributor license agreements. Licensed under the Elastic License | |
* 2.0 and the Server Side Public License, v 1; you may not use this file except | |
* in compliance with, at your election, the Elastic License 2.0 or the Server | |
* Side Public License, v 1. | |
*/ | |
package org.elasticsearch.search; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.SearchType; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.core.TimeValue; | |
import org.elasticsearch.index.query.WildcardQueryBuilder; | |
import org.elasticsearch.test.ESIntegTestCase; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.function.Supplier; | |
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | |
import static org.hamcrest.Matchers.equalTo; | |
/** | |
* This test is a disruption style test that restarts data nodes to see if search behaves well under extreme conditions. | |
*/ | |
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2) | |
public class SearchWithFailingNodesIT extends ESIntegTestCase { | |
public void testNoPartialSearchesDuringRollingRestart() throws Exception { | |
int docCount = scaledRandomIntBetween(10, 1000); | |
logger.info("Using docCount [{}]", docCount); | |
assertAcked(prepareCreate("test").setSettings(Settings.builder() | |
.put("index.number_of_shards", cluster().numDataNodes() + 2).put("index.number_of_replicas", 1))); | |
ensureGreen(); | |
for (int i = 0; i < docCount; i++) { | |
client().prepareIndex("test").setId(""+i).setSource("field1", "i" + i).get(); | |
} | |
refresh("test"); | |
// pick a node for coordinating search requests: We're not going to restart this node | |
String[] nodeNames = internalCluster().getNodeNames(); | |
String coordinatingNode = randomFrom(nodeNames); | |
AtomicBoolean stop = new AtomicBoolean(); | |
List<Thread> searchThreads = new ArrayList<>(); | |
// this is a little extreme, but necessary to provoke spectacular timings like hitting a recovery on a replica | |
for (int i = 0; i < 100; ++i) { | |
Thread searchThread = new Thread() { | |
{ | |
setDaemon(true); | |
} | |
@Override | |
public void run() { | |
while (stop.get() == false) { | |
// todo: the timeouts below should not be necessary, but this test sometimes hangs without, to be fixed (or | |
// explained) | |
verify(() -> client(coordinatingNode).prepareSearch("test").setQuery(new WildcardQueryBuilder("field1", | |
"i*")) | |
.setSearchType(randomFrom(SearchType.values())).setSize(100) | |
.setAllowPartialSearchResults(randomBoolean()).get(TimeValue.timeValueSeconds(10))); | |
verify(() -> client(coordinatingNode).prepareSearch("test").setSearchType(randomFrom(SearchType.values())) | |
.setSize(100).setAllowPartialSearchResults(randomBoolean()).get(TimeValue.timeValueSeconds(10))); | |
} | |
} | |
void verify(Supplier<SearchResponse> call) { | |
try { | |
SearchResponse response = call.get(); | |
assertThat(response.getHits().getHits().length, equalTo(Math.min(100, docCount))); | |
assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount)); | |
} catch (Exception e) { | |
logger.info("Failed search", e); | |
fail("Unexpected search failure: " + e); | |
} | |
} | |
}; | |
searchThreads.add(searchThread); | |
searchThread.start(); | |
} | |
try { | |
for (int i = 0; i < 5; ++i) { | |
String otherNode = randomValueOtherThan(coordinatingNode, () -> randomFrom(internalCluster().getNodeNames())); | |
internalCluster().restartNode(otherNode); | |
ensureGreen("test"); | |
} | |
} finally { | |
stop.set(true); | |
searchThreads.forEach(thread -> { | |
try { | |
thread.join(30000); | |
if (thread.isAlive()) { | |
logger.warn("Thread: " + thread + " is still alive"); | |
// do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me... | |
thread.join(); | |
} | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment