Skip to content

Instantly share code, notes, and snippets.

@helena
Created October 31, 2013 22:18
Show Gist options
  • Save helena/7258149 to your computer and use it in GitHub Desktop.
Save helena/7258149 to your computer and use it in GitHub Desktop.
import scala.language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.Queue
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ ActorLogging, ActorRef, Actor }
import akka.routing.{ CurrentRoutees, RouterRoutees, Broadcast }
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import com.crowdstrike.cloud.InternalLocationAction._
import com.crowdstrike.cloud.UserLocationAction.{ QueuedMessage, Routees }
/**
* INTERNAL API.
*
* Not an Akka router as the routees are not pre-configured but rather continuously fed in.
*
* @author Helena Edelson
*/
private[cloud] abstract class LoadBalancingLocationsRouter(val channel: ClusterChannel, queueLimit: Int) extends Actor
with ActorLookup with ActorLogging {
import LocationEvent._
protected var routees: Set[ActorRef] = channel.routees.map(_.routee)
protected var queued: immutable.Queue[QueuedMessage] = Queue.empty
override def preStart(): Unit = log.info("{} started", self.path)
override def postStop(): Unit = routees = Set.empty
def receive = {
case CurrentRoutees => query(sender)
case RouterRoutees(latest) => refresh(latest)
case Publish(message) => publish(sender, message)
case Broadcast(message) => broadcast(sender, message)
case m: QueuedMessage => enqueue(m)
}
/**
* The implementation strategy.
*/
def getNext: ActorRef
/**
* Pre-filters again for those not terminated and not [[akka.actor.ActorSystem.deadLetters]]
* to handle any state inconsistencies between the time akka notifies locations of a downed
* actor, the time locations removes that from the routees, and the time this gets a message.
*/
def publish(sender: ActorRef, message: Any): Unit = {
routees = routees filter (isValid(_))
if (routees.nonEmpty) getNext forward message
else unroutable(QueuedMessage(sender, message))
}
/**
* Pre-filters again for those not terminated and not [[akka.actor.ActorSystem.deadLetters]]
* to handle any state inconsistencies between the time akka notifies locations of a downed
* actor, the time locations removes that from the routees, and the time this gets a message.
*/
def broadcast(sender: ActorRef, message: Any): Unit = {
routees = routees filter (isValid(_))
if (routees.nonEmpty) routees foreach (_ ! message)
else unroutable(QueuedMessage(sender, message, true))
}
/**
* Notifies the sender and queues the message if there are no routees available and the message
* is not already in the queue.
*/
def unroutable(m: QueuedMessage): Unit = {
log.info("Received unroutable message from {}", m.sender)
if (!queued.exists(_.id == m.id)) enqueue(m)
else sender ! PublicationFailure(NoRouteesFound(channel), m.message)
}
/**
* Queues messages for sending when routees come online. If the queue reaches the configured limit in
* [[com.crowdstrike.cloud.CloudSettings.ClusterLocationsRouterQueueLimit]]
* FIFO takes effect and the oldest message queued is dequeued to allow the most recent to be queued.
*/
def enqueue(message: QueuedMessage): Unit = {
log.info("No routees found for {}, queueing message until routees become available.", channel.id)
if (queued.size == queueLimit) {
val (dequeued, updated) = queued.dequeue
queued = updated
log.info("Dequeued {}, updated queue size {}", dequeued.message, queued.size)
}
queued = queued.enqueue(message)
log.debug("Enqueued {}, queue size {}", message, queued.size)
}
/**
* Adds new and removes stale routees and attempts to drain the `queued` messages, by the implemented
* load-balancing strategy, if routees are not empty.
*/
def refresh(latest: Iterable[ActorRef]): Unit = {
val stale = routees &~ latest.toSet
routees --= stale
val unseen = latest collect {case r if isValid(r) && !routees.contains(r) => r}
routees ++= unseen
if (routees.nonEmpty) {
queued foreach { m =>
if (m.isBroadcast) broadcast(m.sender, m.message) else publish(m.sender, m.message)
}
queued = Queue.empty
}
}
def query(sender: ActorRef): Unit = sender ! RouterRoutees(routees)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment