Skip to content

Instantly share code, notes, and snippets.

@ywelsch
Created May 20, 2022 09:46
Show Gist options
  • Save ywelsch/75511f79665edaff349eb4d363d21200 to your computer and use it in GitHub Desktop.
Save ywelsch/75511f79665edaff349eb4d363d21200 to your computer and use it in GitHub Desktop.
testNoPartialSearchesDuringRollingRestart
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
* This test is a disruption style test that restarts data nodes to see if search behaves well under extreme conditions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class SearchWithFailingNodesIT extends ESIntegTestCase {
public void testNoPartialSearchesDuringRollingRestart() throws Exception {
int docCount = scaledRandomIntBetween(10, 1000);
logger.info("Using docCount [{}]", docCount);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", cluster().numDataNodes() + 2).put("index.number_of_replicas", 1)));
ensureGreen();
for (int i = 0; i < docCount; i++) {
client().prepareIndex("test").setId(""+i).setSource("field1", "i" + i).get();
}
refresh("test");
// pick a node for coordinating search requests: We're not going to restart this node
String[] nodeNames = internalCluster().getNodeNames();
String coordinatingNode = randomFrom(nodeNames);
AtomicBoolean stop = new AtomicBoolean();
List<Thread> searchThreads = new ArrayList<>();
// this is a little extreme, but necessary to provoke spectacular timings like hitting a recovery on a replica
for (int i = 0; i < 100; ++i) {
Thread searchThread = new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
while (stop.get() == false) {
// todo: the timeouts below should not be necessary, but this test sometimes hangs without, to be fixed (or
// explained)
verify(() -> client(coordinatingNode).prepareSearch("test").setQuery(new WildcardQueryBuilder("field1",
"i*"))
.setSearchType(randomFrom(SearchType.values())).setSize(100)
.setAllowPartialSearchResults(randomBoolean()).get(TimeValue.timeValueSeconds(10)));
verify(() -> client(coordinatingNode).prepareSearch("test").setSearchType(randomFrom(SearchType.values()))
.setSize(100).setAllowPartialSearchResults(randomBoolean()).get(TimeValue.timeValueSeconds(10)));
}
}
void verify(Supplier<SearchResponse> call) {
try {
SearchResponse response = call.get();
assertThat(response.getHits().getHits().length, equalTo(Math.min(100, docCount)));
assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount));
} catch (Exception e) {
logger.info("Failed search", e);
fail("Unexpected search failure: " + e);
}
}
};
searchThreads.add(searchThread);
searchThread.start();
}
try {
for (int i = 0; i < 5; ++i) {
String otherNode = randomValueOtherThan(coordinatingNode, () -> randomFrom(internalCluster().getNodeNames()));
internalCluster().restartNode(otherNode);
ensureGreen("test");
}
} finally {
stop.set(true);
searchThreads.forEach(thread -> {
try {
thread.join(30000);
if (thread.isAlive()) {
logger.warn("Thread: " + thread + " is still alive");
// do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me...
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment