Skip to content

Instantly share code, notes, and snippets.

@inexplicable
Created May 7, 2014 22:41
Show Gist options
  • Save inexplicable/a3880e7e8880732e4def to your computer and use it in GitHub Desktop.
Save inexplicable/a3880e7e8880732e4def to your computer and use it in GitHub Desktop.
data center aware rebalance
package com.ebay.squbs.pubsub.cube
import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.immutable
import java.net._
import org.squbs.cluster.RebalanceLogic
import akka.util.ByteString
/**
* Created by huzhou on 5/6/14.
*/
class CorrelateRoundRobinRoutingLogic(zkAddress:Address) extends RoutingLogic {
val fallback = RoundRobinRoutingLogic()
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = {
val candidates = routees.filter({
case ActorSelectionRoutee(selection) =>
correlates(zkAddress.host.getOrElse(""), ActorPath.fromString(selection.pathString).address.host.getOrElse(""))
case _ =>
true
})
fallback.select(message, if(candidates.nonEmpty) candidates else routees)
}
def correlates(mine:String, other:String) = {
import scala.collection.JavaConversions._
if(mine == other)
true
else {
//once resolved, it should be much faster
val myipv4 = InetAddress.getByName(mine).getHostAddress
val others = InetAddress.getByName(other).getHostAddress
//this is a simple heuristics which looks at the subnet mask of 1st 2 sectors in an ipv4 address
myipv4.split('.').take(2) == others.split('.').take(2)
}
}
}
object DataCenterAwareRebalanceLogic extends RebalanceLogic {
private[DataCenterAwareRebalanceLogic] def key(address:Address) =
InetAddress.getByName(address.host.getOrElse("")).getHostAddress.split('.').take(2).mkString(".")
@tailrec private[DataCenterAwareRebalanceLogic] def correlate(members:Seq[Address], groupings:Map[String, Seq[Address]]):Map[String, Seq[Address]] =
if(members.isEmpty)
groupings
else
correlate(members.tail, groupings.updated(key(members.head), groupings.getOrElse(key(members.head), Seq.empty) :+ members.head))
def shuffle(members:Seq[Address]):Seq[Address] = {
@tailrec def crossMerge(grouped:Seq[Seq[Address]], merging:Seq[Address]):Seq[Address] =
if(grouped.isEmpty)
merging
else
grouped.head match {
case Nil => crossMerge(grouped.tail, merging)
case member :: remains => crossMerge(grouped.tail :+ remains, merging :+ member)
}
crossMerge(correlate(members, Map.empty[String, Seq[Address]]).values.to[Seq], Seq.empty[Address])
}
/**
* @return partitionsToMembers compensated when size in service is short compared with what's required
*/
override def compensate(partitionsToMembers:Map[ByteString, Set[Address]], members:Seq[Address], size:(ByteString => Int)):Map[ByteString, Set[Address]] = {
partitionsToMembers.map(assignments => {
val partitionKey = assignments._1
val servants = assignments._2
val requires = size(partitionKey)//bytesToInt(zkClient.getData.forPath(s"/sizes/${keyToPath(partitionKey)}"))
if(servants.size < requires)
partitionKey -> (servants ++ shuffle(members).filterNot(servants.contains(_)).take(requires - servants.size))
else
assignments
})
}
/**
* @return partitionsToMembers rebalanced
*/
override def rebalance(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = {
def rebalanceWithinCorrelates(partitionsToMembers:Map[ByteString, Set[Address]], members:Set[Address]):Map[ByteString, Set[Address]] = {
val utilization = mutable.Map.empty[Address, Set[ByteString]]
partitionsToMembers.foreach(assignment => assignment._2.foreach(member => utilization += member -> (utilization.getOrElse(member, Set.empty) + assignment._1)))
val ordered = members.toSeq.sortWith((one, two) => utilization.getOrElse(one, Set.empty).size < utilization.getOrElse(two, Set.empty).size)
if (utilization.getOrElse(ordered.last, Set.empty).size - utilization.getOrElse(ordered.head, Set.empty).size > 1) {
val move = utilization.getOrElse(ordered.last, Set.empty).head
rebalanceWithinCorrelates(partitionsToMembers.updated(move, (partitionsToMembers.getOrElse(move, Set.empty) + ordered.head - ordered.last)), members)
}
else {
//end condition, must trigger everyone to get the changes
partitionsToMembers
}
}
var plan = partitionsToMembers
correlate(members.to[Seq], Map.empty[String, Seq[Address]]).values.foreach(correlates =>
plan = rebalanceWithinCorrelates(plan, correlates.to[Set]))
plan
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment