Created
August 9, 2022 18:41
-
-
Save dreamer-89/9e6a781e57dbf1008c97034103b6a9a8 to your computer and use it in GitHub Desktop.
test PrimaryShardAllocator chooses furthest ahead replica
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Tests whether primary shard allocation via PrimaryShardAllocation chooses the replica with further ahead | |
* ReplicationCheckpoint | |
*/ | |
public void testPrimaryShardAllocatorUsesFurthestAheadReplica() throws Exception { | |
final Settings settings = Settings.builder() | |
.put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) | |
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | |
.build(); | |
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY); | |
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); | |
createIndex(INDEX_NAME, settings); | |
final String firstReplica = internalCluster().startDataOnlyNode(Settings.EMPTY); | |
final String secondReplica = internalCluster().startDataOnlyNode(Settings.EMPTY); | |
// Index docs & refresh to bring all replicas to initial checkpoint | |
indexDocs(scaledRandomIntBetween(20, 200)); | |
refresh(INDEX_NAME); | |
ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); | |
logger.info("--> restarting first replica node"); | |
// restart first replica & index/refresh in background to advance 2nd replica | |
indexDocsAndRefresh(scaledRandomIntBetween(20, 200)); | |
logger.info("--> firstReplica RC {}", getIndexShard(firstReplica).getLatestReplicationCheckpoint()); | |
internalCluster().restartNode(firstReplica); | |
ensureGreen(INDEX_NAME); | |
// Restart primary node and ensure secondReplica is chosen as new primary | |
logger.info("--> firstReplica RC {}", getIndexShard(firstReplica).getLatestReplicationCheckpoint()); | |
logger.info("--> primaryShard RC {}", getIndexShard(primaryNode).getLatestReplicationCheckpoint()); | |
logger.info("--> secondReplica RC {}", getIndexShard(secondReplica).getLatestReplicationCheckpoint()); | |
logger.info("--> cluster state before restarting replicas {}", client().admin().cluster().prepareState().get().getState()); | |
restartNode(firstReplica); | |
restartNode(secondReplica); | |
logger.info("--> cluster state after restarting replicas {}", client().admin().cluster().prepareState().get().getState()); | |
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); | |
clusterState = client().admin().cluster().prepareState().get().getState(); | |
logger.info("--> cluster state 3 {}", clusterState); | |
ensureYellow(INDEX_NAME); // --> times out as there are no in-sync Alloc Id to recover from | |
} | |
private void indexDocs(int docCount) throws Exception { | |
try ( | |
BackgroundIndexer indexer = new BackgroundIndexer( | |
INDEX_NAME, | |
"_doc", | |
client(), | |
-1, | |
RandomizedTest.scaledRandomIntBetween(2, 5), | |
false, | |
random() | |
) | |
) { | |
indexer.start(docCount); | |
waitForDocs(docCount, indexer); | |
refresh(INDEX_NAME); | |
} | |
} | |
private void indexDocsAndRefresh(int docCount) { | |
Runnable asyncIndexer = () -> { | |
try { | |
indexDocs(docCount); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
}; | |
asyncIndexer.run(); | |
} | |
private void restartNode(String nodeName) { | |
Runnable t1 = () -> { | |
try { | |
internalCluster().restartNode(nodeName); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
}; | |
t1.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment