Created
May 7, 2014 22:41
-
-
Save inexplicable/a3880e7e8880732e4def to your computer and use it in GitHub Desktop.
data center aware rebalance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ebay.squbs.pubsub.cube | |
import akka.actor._ | |
import akka.routing._ | |
import scala.annotation.tailrec | |
import scala.collection.mutable | |
import scala.collection.immutable | |
import java.net._ | |
import org.squbs.cluster.RebalanceLogic | |
import akka.util.ByteString | |
/** | |
* Created by huzhou on 5/6/14. | |
*/ | |
class CorrelateRoundRobinRoutingLogic(zkAddress:Address) extends RoutingLogic { | |
val fallback = RoundRobinRoutingLogic() | |
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = { | |
val candidates = routees.filter({ | |
case ActorSelectionRoutee(selection) => | |
correlates(zkAddress.host.getOrElse(""), ActorPath.fromString(selection.pathString).address.host.getOrElse("")) | |
case _ => | |
true | |
}) | |
fallback.select(message, if(candidates.nonEmpty) candidates else routees) | |
} | |
def correlates(mine:String, other:String) = { | |
import scala.collection.JavaConversions._ | |
if(mine == other) | |
true | |
else { | |
//once resolved, it should be much faster | |
val myipv4 = InetAddress.getByName(mine).getHostAddress | |
val others = InetAddress.getByName(other).getHostAddress | |
//this is a simple heuristics which looks at the subnet mask of 1st 2 sectors in an ipv4 address | |
myipv4.split('.').take(2) == others.split('.').take(2) | |
} | |
} | |
} | |
object DataCenterAwareRebalanceLogic extends RebalanceLogic { | |
private[DataCenterAwareRebalanceLogic] def key(address:Address) = | |
InetAddress.getByName(address.host.getOrElse("")).getHostAddress.split('.').take(2).mkString(".") | |
@tailrec private[DataCenterAwareRebalanceLogic] def correlate(members:Seq[Address], groupings:Map[String, Seq[Address]]):Map[String, Seq[Address]] = | |
if(members.isEmpty) | |
groupings | |
else | |
correlate(members.tail, groupings.updated(key(members.head), groupings.getOrElse(key(members.head), Seq.empty) :+ members.head)) | |
def shuffle(members:Seq[Address]):Seq[Address] = { | |
@tailrec def crossMerge(grouped:Seq[Seq[Address]], merging:Seq[Address]):Seq[Address] = | |
if(grouped.isEmpty) | |
merging | |
else | |
grouped.head match { | |
case Nil => crossMerge(grouped.tail, merging) | |
case member :: remains => crossMerge(grouped.tail :+ remains, merging :+ member) | |
} | |
crossMerge(correlate(members, Map.empty[String, Seq[Address]]).values.to[Seq], Seq.empty[Address]) | |
} | |
/** | |
* @return partitionsToMembers compensated when size in service is short compared with what's required | |
*/ | |
override def compensate(partitionsToMembers:Map[ByteString, Set[Address]], members:Seq[Address], size:(ByteString => Int)):Map[ByteString, Set[Address]] = { | |
partitionsToMembers.map(assignments => { | |
val partitionKey = assignments._1 | |
val servants = assignments._2 | |
val requires = size(partitionKey)//bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}")) | |
if(servants.size < requires) | |
partitionKey -> (servants ++ shuffle(members).filterNot(servants.contains(_)).take(requires - servants.size)) | |
else | |
assignments | |
}) | |
} | |
/** | |
* @return partitionsToMembers rebalanced | |
*/ | |
override def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = { | |
def rebalanceWithinCorrelates(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = { | |
val utilization = mutable.Map.empty[Address, Set[ByteString]] | |
partitionsToMembers.foreach(assignment => assignment._2.foreach(member => utilization += member -> (utilization.getOrElse(member, Set.empty) + assignment._1))) | |
val ordered = members.toSeq.sortWith((one, two) => utilization.getOrElse(one, Set.empty).size < utilization.getOrElse(two, Set.empty).size) | |
if (utilization.getOrElse(ordered.last, Set.empty).size - utilization.getOrElse(ordered.head, Set.empty).size > 1) { | |
val move = utilization.getOrElse(ordered.last, Set.empty).head | |
rebalanceWithinCorrelates(partitionsToMembers.updated(move, (partitionsToMembers.getOrElse(move, Set.empty) + ordered.head - ordered.last)), members) | |
} | |
else { | |
//end condition, must trigger everyone to get the changes | |
partitionsToMembers | |
} | |
} | |
var plan = partitionsToMembers | |
correlate(members.to[Seq], Map.empty[String, Seq[Address]]).values.foreach(correlates => | |
plan = rebalanceWithinCorrelates(plan, correlates.to[Set])) | |
plan | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment