-
-
Save masaruh/63db5a030220a26cc2a8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.elasticsearch.cluster.routing.allocation; | |
import com.google.common.collect.ImmutableMap; | |
import org.elasticsearch.cluster.ClusterState; | |
import org.elasticsearch.cluster.metadata.IndexMetaData; | |
import org.elasticsearch.cluster.metadata.MetaData; | |
import org.elasticsearch.cluster.node.DiscoveryNodes; | |
import org.elasticsearch.cluster.routing.RoutingTable; | |
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; | |
import org.elasticsearch.common.logging.ESLogger; | |
import org.elasticsearch.common.logging.Loggers; | |
import org.elasticsearch.test.ElasticsearchAllocationTestCase; | |
import org.junit.Test; | |
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; | |
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; | |
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; | |
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; | |
import static org.hamcrest.Matchers.equalTo; | |
/** | |
*/ | |
public class AwarenessAllocationTests2 extends ElasticsearchAllocationTestCase { | |
private final ESLogger logger = Loggers.getLogger(AwarenessAllocationTests2.class); | |
@Test | |
public void testUnbalancedZones() { | |
AllocationService strategy = createAllocationService(settingsBuilder() | |
.put("cluster.routing.allocation.concurrent_recoveries", 10) | |
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") | |
.put("cluster.routing.allocation.awareness.attributes", "rack_id") | |
.build()); | |
logger.info("Building initial routing table for 'testUnbalancedZones2'"); | |
MetaData metaData = MetaData.builder() | |
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(4)) | |
.build(); | |
RoutingTable routingTable = RoutingTable.builder() | |
.addAsNew(metaData.index("test")) | |
.build(); | |
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); | |
logger.info("--> adding 5 nodes on same rack and do rerouting"); | |
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() | |
.put(newNode("A-0", ImmutableMap.of("rack_id", "a"))) | |
.put(newNode("A-1", ImmutableMap.of("rack_id", "a"))) | |
.put(newNode("A-2", ImmutableMap.of("rack_id", "a"))) | |
.put(newNode("A-3", ImmutableMap.of("rack_id", "a"))) | |
.put(newNode("B-0", ImmutableMap.of("rack_id", "b"))) | |
).build(); | |
routingTable = strategy.reroute(clusterState).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
// Primary should be initializing. | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(0)); | |
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | |
logger.info("--> start the shards (primaries)"); | |
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
// All replicas should be initializing. | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); // Primary | |
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4)); | |
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
logger.info("--> all replicas are allocated and started since we have on node in each zone"); | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(5)); | |
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); | |
/////////////////////////////////////////////////////////////////////////////////////// | |
// If two nodes leave and come back, it allocates shards on them. | |
logger.info("--> removing two nodes from cluster."); | |
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) | |
.remove("A-2") | |
.remove("A-3") | |
).build(); | |
routingTable = strategy.reroute(clusterState).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
// Two replicas get unassigned. | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3)); | |
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); | |
// Add removed nodes back to cluster. | |
logger.info("--> adding two nodes back to cluster."); | |
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) | |
.put(newNode("A-2", ImmutableMap.of("rack_id", "a"))) | |
.put(newNode("A-3", ImmutableMap.of("rack_id", "a"))) | |
).build(); | |
routingTable = strategy.reroute(clusterState).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3)); | |
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); | |
// Start shards. | |
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
/////////////////////////////////////////////////////////////////////////////////////// | |
// But if only one node leaves and comes back, it doesn't allocate a shard. | |
logger.info("--> removing a node from cluster."); | |
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) | |
.remove("A-3") | |
).build(); | |
routingTable = strategy.reroute(clusterState).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
// One replica gets unassigned. | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4)); | |
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); | |
// Add removed node back to cluster. | |
logger.info("--> adding a node back to cluster."); | |
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) | |
.put(newNode("A-3", ImmutableMap.of("rack_id", "a"))) | |
).build(); | |
routingTable = strategy.reroute(clusterState).routingTable(); | |
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); | |
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4)); | |
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); // <= this fails | |
// routingTable.prettyPrint() shows something like: | |
// -- index [test] | |
// ----shard_id [test][0] | |
// --------[test][0], node[A-0], [P], s[STARTED] | |
// --------[test][0], node[A-1], [R], s[STARTED] | |
// --------[test][0], node[B-0], [R], s[STARTED] | |
// --------[test][0], node[A-2], [R], s[STARTED] | |
// --------[test][0], node[null], [R], s[UNASSIGNED], unassigned_info[[reason=NODE_LEFT], at[2015-07-29T03:10:56.544Z], details[node_left[A-3]]] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment