Skip to content

Instantly share code, notes, and snippets.

@mardambey
Created August 23, 2012 02:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mardambey/3431451 to your computer and use it in GitHub Desktop.
Save mardambey/3431451 to your computer and use it in GitHub Desktop.
An Akka router that sends messages to its routees based on a consistent hash.
object ConsistentHashRouter {
/**
* Creates a new ConsistentHashRouter, routing to the specified routees
*/
def apply(routees: Iterable[ActorRef]): ConsistentHashRouter =
new ConsistentHashRouter(routees = routees map (_.path.toString))
}
case class ConsistentHashRouter(
nrOfInstances: Int = 0,
routees: Iterable[String] = Nil,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy)
extends RouterConfig with ConsistentHashLike {
protected val log = Logger.getLogger(getClass.getName)
}
trait ConsistentHashLike { this: RouterConfig =>
val SIZEOF_INTEGER = 4
protected val log:Logger
def nrOfInstances: Int
def routees: Iterable[String]
val routeeIds = if (routees.size == 0) (0 until nrOfInstances).toList else (0 until routees.size).toList
val hash = new ConsistentHash(routeeIds, 1) // 1 replica
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
def getNext(msg:Any): ActorRef = {
val _routees = routeeProvider.routees
val msgBytes = ByteBuffer.allocate(SIZEOF_INTEGER).putInt(msg.hashCode())
val node = hash.nodeFor(msgBytes.array())
_routees(node)
}
{
case (sender, message) => message match {
case Broadcast(msg) => toAll(sender, routeeProvider.routees)
case msg => List(Destination(sender, getNext(msg)))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment