Last active
March 15, 2016 20:37
-
-
Save vinothchandar/e8837df301501f85e257 to your computer and use it in GitHub Desktop.
Helix Skew Patch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
index a8d83a2..bfbc6d7 100644 | |
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java | |
@@ -125,6 +125,7 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { | |
int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); | |
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); | |
+ System.out.println(">>> Computing partition assignment"); | |
placementScheme.init(_manager); | |
_algorithm = | |
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition, | |
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
index 11b5b0d..fd2d7d1 100644 | |
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java | |
@@ -19,19 +19,12 @@ package org.apache.helix.controller.strategy; | |
* under the License. | |
*/ | |
-import java.util.ArrayList; | |
-import java.util.Collections; | |
-import java.util.HashMap; | |
-import java.util.Iterator; | |
-import java.util.LinkedHashMap; | |
-import java.util.LinkedHashSet; | |
-import java.util.List; | |
-import java.util.Map; | |
+import java.nio.charset.Charset; | |
+import java.util.*; | |
import java.util.Map.Entry; | |
-import java.util.Set; | |
-import java.util.TreeMap; | |
-import java.util.TreeSet; | |
+import com.google.common.hash.HashFunction; | |
+import com.google.common.hash.Hashing; | |
import org.apache.helix.HelixManager; | |
import org.apache.helix.ZNRecord; | |
import org.apache.log4j.Logger; | |
@@ -535,7 +528,7 @@ public class AutoRebalanceStrategy { | |
for (int replicaId = 0; replicaId < count; replicaId++) { | |
Replica replica = new Replica(partition, replicaId); | |
String nodeName = | |
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas, | |
+ _placementScheme.getLocation(_resourceName, partitionId, replicaId, _partitions.size(), numReplicas, | |
allNodes); | |
preferredMapping.put(replica, _nodeMap.get(nodeName)); | |
} | |
@@ -546,7 +539,6 @@ public class AutoRebalanceStrategy { | |
/** | |
* Counts the total number of replicas given a state-count mapping | |
- * @param states | |
* @return | |
*/ | |
private int countStateReplicas() { | |
@@ -710,6 +702,8 @@ public class AutoRebalanceStrategy { | |
/** | |
* Given properties of this replica, determine the node it would prefer to be served by | |
+ * | |
+ * @param resourceName the resource name | |
* @param partitionId The current partition | |
* @param replicaId The current replica with respect to the current partition | |
* @param numPartitions The total number of partitions | |
@@ -717,7 +711,7 @@ public class AutoRebalanceStrategy { | |
* @param nodeNames A list of identifiers of all nodes, live and non-live | |
* @return The name of the node that would prefer to serve this replica | |
*/ | |
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, | |
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas, | |
final List<String> nodeNames); | |
} | |
@@ -726,27 +720,39 @@ public class AutoRebalanceStrategy { | |
* evenly as possible while avoiding placing two replicas of the same partition on any node. | |
*/ | |
public static class DefaultPlacementScheme implements ReplicaPlacementScheme { | |
+ | |
+ // Arbitrary seed value. | |
+ private final int SEED_VALUE = 5736856; | |
+ private HashFunction _hashFn = Hashing.murmur3_128(SEED_VALUE); | |
+ | |
@Override | |
public void init(final HelixManager manager) { | |
// do nothing since this is independent of the manager | |
} | |
+ private int getNodeShift(String resourceName, int buckets) { | |
+ return Hashing.consistentHash(_hashFn.hashString(resourceName, Charset.forName("UTF-8")), buckets); | |
+ } | |
+ | |
@Override | |
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, | |
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas, | |
final List<String> nodeNames) { | |
int index; | |
+ int nodeShift = getNodeShift(resourceName, nodeNames.size()); | |
+ | |
if (nodeNames.size() > numPartitions) { | |
// assign replicas in partition order in case there are more nodes than partitions | |
- index = (partitionId + replicaId * numPartitions) % nodeNames.size(); | |
+ index = (nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size(); | |
} else if (nodeNames.size() == numPartitions) { | |
// need a replica offset in case the sizes of these sets are the same | |
index = | |
- ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) | |
+ ((nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) | |
% nodeNames.size(); | |
} else { | |
// in all other cases, assigning a replica at a time for each partition is reasonable | |
- index = (partitionId + replicaId) % nodeNames.size(); | |
+ index = (nodeShift + partitionId + replicaId) % nodeNames.size(); | |
} | |
+ System.out.format(">>>> NodeShift for %s %s is %d, index %d \n", resourceName, partitionId, nodeShift, index); | |
return nodeNames.get(index); | |
} | |
} | |
diff --git a/pom.xml b/pom.xml | |
index adc3b06..797e218 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -29,7 +29,7 @@ under the License. | |
<groupId>org.apache.helix</groupId> | |
<artifactId>helix</artifactId> | |
- <version>0.6.5</version> | |
+ <version>0.6.6</version> | |
<packaging>pom</packaging> | |
<name>Apache Helix</name> | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment