Skip to content

Instantly share code, notes, and snippets.

@dreamer-89
Created August 9, 2022 18:41
Show Gist options
  • Save dreamer-89/9e6a781e57dbf1008c97034103b6a9a8 to your computer and use it in GitHub Desktop.
Save dreamer-89/9e6a781e57dbf1008c97034103b6a9a8 to your computer and use it in GitHub Desktop.
test PrimaryShardAllocator chooses furthest ahead replica
/**
* Tests whether primary shard allocation via PrimaryShardAllocation chooses the replica with further ahead
* ReplicationCheckpoint
*/
public void testPrimaryShardAllocatorUsesFurthestAheadReplica() throws Exception {
final Settings settings = Settings.builder()
.put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
createIndex(INDEX_NAME, settings);
final String firstReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String secondReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
// Index docs & refresh to bring all replicas to initial checkpoint
indexDocs(scaledRandomIntBetween(20, 200));
refresh(INDEX_NAME);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
logger.info("--> restarting first replica node");
// restart first replica & index/refresh in background to advance 2nd replica
indexDocsAndRefresh(scaledRandomIntBetween(20, 200));
logger.info("--> firstReplica RC {}", getIndexShard(firstReplica).getLatestReplicationCheckpoint());
internalCluster().restartNode(firstReplica);
ensureGreen(INDEX_NAME);
// Restart primary node and ensure secondReplica is chosen as new primary
logger.info("--> firstReplica RC {}", getIndexShard(firstReplica).getLatestReplicationCheckpoint());
logger.info("--> primaryShard RC {}", getIndexShard(primaryNode).getLatestReplicationCheckpoint());
logger.info("--> secondReplica RC {}", getIndexShard(secondReplica).getLatestReplicationCheckpoint());
logger.info("--> cluster state before restarting replicas {}", client().admin().cluster().prepareState().get().getState());
restartNode(firstReplica);
restartNode(secondReplica);
logger.info("--> cluster state after restarting replicas {}", client().admin().cluster().prepareState().get().getState());
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
clusterState = client().admin().cluster().prepareState().get().getState();
logger.info("--> cluster state 3 {}", clusterState);
ensureYellow(INDEX_NAME); // --> times out as there are no in-sync Alloc Id to recover from
}
private void indexDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(INDEX_NAME);
}
}
private void indexDocsAndRefresh(int docCount) {
Runnable asyncIndexer = () -> {
try {
indexDocs(docCount);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
asyncIndexer.run();
}
private void restartNode(String nodeName) {
Runnable t1 = () -> {
try {
internalCluster().restartNode(nodeName);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
t1.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment