Skip to content

Instantly share code, notes, and snippets.

@jaytaylor
Created March 22, 2012 23:10
Show Gist options
  • Save jaytaylor/2165352 to your computer and use it in GitHub Desktop.
Save jaytaylor/2165352 to your computer and use it in GitHub Desktop.
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.actor.{ UntypedActor, Actor, ActorRef, ForwardableChannel }
/**
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
*/
trait Dispatcher { this: Actor ⇒
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message) ⇒ broadcast(transform(message))
case a if routes.isDefinedAt(a) ⇒
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
protected def isSenderDefined = self.channel.isInstanceOf[ForwardableChannel]
}
/**
* An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedDispatcher extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
protected def isSenderDefined = self.channel.isInstanceOf[ForwardableChannel]
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = msg match {
case Routing.Broadcast(message) ⇒ broadcast(transform(message))
case msg =>
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Dispatcher { self: Actor ⇒
protected def seq: InfiniteIterator[ActorRef]
protected def routes = { case x if seq.hasNext ⇒ seq.next }
override def broadcast(message: Any) =
seq.items.foreach( a => if (isSenderDefined) a.forward(message)(someSelf) else a.!(message)(None))
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedDispatcher {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) = if (seq.hasNext) seq.next else null
override def broadcast(message: Any) =
seq.items.foreach( a => if (isSenderDefined) a.forward(message)(someSelf) else a.!(message)(None))
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment