Created
March 25, 2016 15:40
-
-
Save vinothchandar/18feedfa84650e3efdc0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
index a8d83a2..bfbc6d7 100644 | |
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
@@ -125,6 +125,7 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { | |
int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); | |
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); | |
+ System.out.println(">>> Computing partition assignment"); | |
placementScheme.init(_manager); | |
_algorithm = | |
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition, | |
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
index 11b5b0d..eb8c367 100644 | |
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
@@ -19,19 +19,12 @@ package org.apache.helix.controller.strategy; | |
* under the License. | |
*/ | |
-import java.util.ArrayList; | |
-import java.util.Collections; | |
-import java.util.HashMap; | |
-import java.util.Iterator; | |
-import java.util.LinkedHashMap; | |
-import java.util.LinkedHashSet; | |
-import java.util.List; | |
-import java.util.Map; | |
+import java.nio.charset.Charset; | |
+import java.util.*; | |
import java.util.Map.Entry; | |
-import java.util.Set; | |
-import java.util.TreeMap; | |
-import java.util.TreeSet; | |
+import com.google.common.hash.HashFunction; | |
+import com.google.common.hash.Hashing; | |
import org.apache.helix.HelixManager; | |
import org.apache.helix.ZNRecord; | |
import org.apache.log4j.Logger; | |
@@ -112,17 +105,24 @@ public class AutoRebalanceStrategy { | |
// compute the preferred mapping if all nodes were up | |
_preferredAssignment = computePreferredPlacement(allNodes); | |
+ System.out.println(">>>> Preferred Assignment: "+ _preferredAssignment); | |
// logger.info("preferred mapping:"+ preferredAssignment); | |
// from current mapping derive the ones in preferred location | |
// this will update the nodes with their current fill status | |
_existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping); | |
+ System.out.println(">>>> Existing Preferred Assignment: "+ _existingPreferredAssignment); | |
+ | |
// from current mapping derive the ones not in preferred location | |
_existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping); | |
+ System.out.println(">>>> Existing Non Preferred Assignment: "+ _existingNonPreferredAssignment); | |
+ | |
// compute orphaned replicas that are not assigned to any node | |
_orphaned = computeOrphaned(); | |
+ System.out.println(">>>> Orphaned: "+ _orphaned); | |
+ | |
if (logger.isInfoEnabled()) { | |
logger.info("orphan = " + _orphaned); | |
} | |
@@ -133,6 +133,8 @@ public class AutoRebalanceStrategy { | |
moveExcessReplicas(); | |
+ | |
+ System.out.println(">>> Final State Map :"+ _stateMap); | |
prepareResult(znRecord); | |
return znRecord; | |
} | |
@@ -315,6 +317,9 @@ public class AutoRebalanceStrategy { | |
i++; | |
} | |
} | |
+ | |
+ System.out.println(">>>> Final ZK record : "+ znRecord.toString()); | |
+ | |
} | |
/** | |
@@ -535,7 +540,7 @@ public class AutoRebalanceStrategy { | |
for (int replicaId = 0; replicaId < count; replicaId++) { | |
Replica replica = new Replica(partition, replicaId); | |
String nodeName = | |
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas, | |
+ _placementScheme.getLocation(_resourceName, partitionId, replicaId, _partitions.size(), numReplicas, | |
allNodes); | |
preferredMapping.put(replica, _nodeMap.get(nodeName)); | |
} | |
@@ -546,7 +551,6 @@ public class AutoRebalanceStrategy { | |
/** | |
* Counts the total number of replicas given a state-count mapping | |
- * @param states | |
* @return | |
*/ | |
private int countStateReplicas() { | |
@@ -710,6 +714,8 @@ public class AutoRebalanceStrategy { | |
/** | |
* Given properties of this replica, determine the node it would prefer to be served by | |
+ * | |
+ * @param resourceName the resource name | |
* @param partitionId The current partition | |
* @param replicaId The current replica with respect to the current partition | |
* @param numPartitions The total number of partitions | |
@@ -717,7 +723,7 @@ public class AutoRebalanceStrategy { | |
* @param nodeNames A list of identifiers of all nodes, live and non-live | |
* @return The name of the node that would prefer to serve this replica | |
*/ | |
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, | |
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas, | |
final List<String> nodeNames); | |
} | |
@@ -726,27 +732,39 @@ public class AutoRebalanceStrategy { | |
* evenly as possible while avoiding placing two replicas of the same partition on any node. | |
*/ | |
public static class DefaultPlacementScheme implements ReplicaPlacementScheme { | |
+ | |
+ // Arbitrary seed value. | |
+ private final int SEED_VALUE = 5736856; | |
+ private HashFunction _hashFn = Hashing.murmur3_128(SEED_VALUE); | |
+ | |
@Override | |
public void init(final HelixManager manager) { | |
// do nothing since this is independent of the manager | |
} | |
+ private int getNodeShift(String resourceName, int buckets) { | |
+ return Hashing.consistentHash(_hashFn.hashString(resourceName, Charset.forName("UTF-8")), buckets); | |
+ } | |
+ | |
@Override | |
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, | |
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas, | |
final List<String> nodeNames) { | |
int index; | |
+ int nodeShift = getNodeShift(resourceName, nodeNames.size()); | |
+ | |
if (nodeNames.size() > numPartitions) { | |
// assign replicas in partition order in case there are more nodes than partitions | |
- index = (partitionId + replicaId * numPartitions) % nodeNames.size(); | |
+ index = (nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size(); | |
} else if (nodeNames.size() == numPartitions) { | |
// need a replica offset in case the sizes of these sets are the same | |
index = | |
- ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) | |
+ ((nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) | |
% nodeNames.size(); | |
} else { | |
// in all other cases, assigning a replica at a time for each partition is reasonable | |
- index = (partitionId + replicaId) % nodeNames.size(); | |
+ index = (nodeShift + partitionId + replicaId) % nodeNames.size(); | |
} | |
+ System.out.format(">>>> NodeShift for %s %s is %d, index %d \n", resourceName, partitionId, nodeShift, index); | |
return nodeNames.get(index); | |
} | |
} | |
diff --git a/pom.xml b/pom.xml | |
index adc3b06..797e218 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -29,7 +29,7 @@ under the License. | |
<groupId>org.apache.helix</groupId> | |
<artifactId>helix</artifactId> | |
- <version>0.6.5</version> | |
+ <version>0.6.6</version> | |
<packaging>pom</packaging> | |
<name>Apache Helix</name> | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment