Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Last active March 15, 2016 20:37
Show Gist options
  • Save vinothchandar/e8837df301501f85e257 to your computer and use it in GitHub Desktop.
Save vinothchandar/e8837df301501f85e257 to your computer and use it in GitHub Desktop.
Helix Skew Patch
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index a8d83a2..bfbc6d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -125,6 +125,7 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ System.out.println(">>> Computing partition assignment");
placementScheme.init(_manager);
_algorithm =
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 11b5b0d..fd2d7d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -19,19 +19,12 @@ package org.apache.helix.controller.strategy;
* under the License.
*/
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
+import java.nio.charset.Charset;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
@@ -535,7 +528,7 @@ public class AutoRebalanceStrategy {
for (int replicaId = 0; replicaId < count; replicaId++) {
Replica replica = new Replica(partition, replicaId);
String nodeName =
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
+ _placementScheme.getLocation(_resourceName, partitionId, replicaId, _partitions.size(), numReplicas,
allNodes);
preferredMapping.put(replica, _nodeMap.get(nodeName));
}
@@ -546,7 +539,6 @@ public class AutoRebalanceStrategy {
/**
* Counts the total number of replicas given a state-count mapping
- * @param states
* @return
*/
private int countStateReplicas() {
@@ -710,6 +702,8 @@ public class AutoRebalanceStrategy {
/**
* Given properties of this replica, determine the node it would prefer to be served by
+ *
+ * @param resourceName the resource name
* @param partitionId The current partition
* @param replicaId The current replica with respect to the current partition
* @param numPartitions The total number of partitions
@@ -717,7 +711,7 @@ public class AutoRebalanceStrategy {
* @param nodeNames A list of identifiers of all nodes, live and non-live
* @return The name of the node that would prefer to serve this replica
*/
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas,
final List<String> nodeNames);
}
@@ -726,27 +720,39 @@ public class AutoRebalanceStrategy {
* evenly as possible while avoiding placing two replicas of the same partition on any node.
*/
public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
+
+ // Arbitrary seed value.
+ private final int SEED_VALUE = 5736856;
+ private HashFunction _hashFn = Hashing.murmur3_128(SEED_VALUE);
+
@Override
public void init(final HelixManager manager) {
// do nothing since this is independent of the manager
}
+ private int getNodeShift(String resourceName, int buckets) {
+ return Hashing.consistentHash(_hashFn.hashString(resourceName, Charset.forName("UTF-8")), buckets);
+ }
+
@Override
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+ public String getLocation(String resourceName, int partitionId, int replicaId, int numPartitions, int numReplicas,
final List<String> nodeNames) {
int index;
+ int nodeShift = getNodeShift(resourceName, nodeNames.size());
+
if (nodeNames.size() > numPartitions) {
// assign replicas in partition order in case there are more nodes than partitions
- index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+ index = (nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size();
} else if (nodeNames.size() == numPartitions) {
// need a replica offset in case the sizes of these sets are the same
index =
- ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
+ ((nodeShift + partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
% nodeNames.size();
} else {
// in all other cases, assigning a replica at a time for each partition is reasonable
- index = (partitionId + replicaId) % nodeNames.size();
+ index = (nodeShift + partitionId + replicaId) % nodeNames.size();
}
+ System.out.format(">>>> NodeShift for %s %s is %d, index %d \n", resourceName, partitionId, nodeShift, index);
return nodeNames.get(index);
}
}
diff --git a/pom.xml b/pom.xml
index adc3b06..797e218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@ under the License.
<groupId>org.apache.helix</groupId>
<artifactId>helix</artifactId>
- <version>0.6.5</version>
+ <version>0.6.6</version>
<packaging>pom</packaging>
<name>Apache Helix</name>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment