Skip to content

Instantly share code, notes, and snippets.

@masaruh
Last active August 29, 2015 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masaruh/63db5a030220a26cc2a8 to your computer and use it in GitHub Desktop.
Save masaruh/63db5a030220a26cc2a8 to your computer and use it in GitHub Desktop.
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class AwarenessAllocationTests2 extends ElasticsearchAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(AwarenessAllocationTests2.class);
@Test
public void testUnbalancedZones() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
logger.info("Building initial routing table for 'testUnbalancedZones2'");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(4))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding 5 nodes on same rack and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("A-0", ImmutableMap.of("rack_id", "a")))
.put(newNode("A-1", ImmutableMap.of("rack_id", "a")))
.put(newNode("A-2", ImmutableMap.of("rack_id", "a")))
.put(newNode("A-3", ImmutableMap.of("rack_id", "a")))
.put(newNode("B-0", ImmutableMap.of("rack_id", "b")))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// Primary should be initializing.
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(0));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// All replicas should be initializing.
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1)); // Primary
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("--> all replicas are allocated and started since we have on node in each zone");
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(5));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
///////////////////////////////////////////////////////////////////////////////////////
// If two nodes leave and come back, it allocates shards on them.
logger.info("--> removing two nodes from cluster.");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("A-2")
.remove("A-3")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// Two replicas get unassigned.
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
// Add removed nodes back to cluster.
logger.info("--> adding two nodes back to cluster.");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("A-2", ImmutableMap.of("rack_id", "a")))
.put(newNode("A-3", ImmutableMap.of("rack_id", "a")))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(3));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// Start shards.
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
///////////////////////////////////////////////////////////////////////////////////////
// But if only one node leaves and comes back, it doesn't allocate a shard.
logger.info("--> removing a node from cluster.");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove("A-3")
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// One replica gets unassigned.
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
// Add removed node back to cluster.
logger.info("--> adding a node back to cluster.");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("A-3", ImmutableMap.of("rack_id", "a")))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); // <= this fails
// routingTable.prettyPrint() shows something like:
// -- index [test]
// ----shard_id [test][0]
// --------[test][0], node[A-0], [P], s[STARTED]
// --------[test][0], node[A-1], [R], s[STARTED]
// --------[test][0], node[B-0], [R], s[STARTED]
// --------[test][0], node[A-2], [R], s[STARTED]
// --------[test][0], node[null], [R], s[UNASSIGNED], unassigned_info[[reason=NODE_LEFT], at[2015-07-29T03:10:56.544Z], details[node_left[A-3]]]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment