Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Conspire's implementation of work pulling
class AnalyticsLeader(supervisor: ActorRef) extends Leader[ProcessUser, AnalyticsNode, AnalyticsMessage](supervisor)
class AnalyticsNode extends Node[AnalyticsProcessor, AnalyticsMessage]("analytics")
package com.goconspire.commons
import scala.concurrent.duration._
import scala.collection.mutable.{Map, Queue}
import scala.reflect.ClassTag
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Terminated
import akka.event.LoggingReceive
import akka.routing.FromConfig
import akka.util.Timeout
import com.goconspire.commons.messages._
class Leader[U <: WorkUnit, N <: Node[_, J], J <: JobMessage[J]](supervisor: ActorRef) extends Actor with ActorLogging {
// Create the cluster-aware router for managing remote routees
context.actorOf(Props[N].withRouter(FromConfig()),
"nodeRouter")
val workers = Map.empty[ActorRef, Option[(ActorRef, U)]]
val workQueue = Queue.empty[(ActorRef, U)]
// Notifies workers that there's work available, provided they're
// not already working on something
def notifyWorkers(): Unit = {
if (!workQueue.isEmpty) {
workers.foreach {
case (worker, m) if (m.isEmpty) => worker ! WorkIsReady
case _ =>
}
}
}
override def preStart = {
log.info("Starting leader at {}", self.path.toStringWithAddress(self.path.address))
}
def receive = LoggingReceive {
case WorkerCreated(worker) =>
log.info("Worker created: {}", worker)
context.watch(worker)
workers += (worker -> None)
notifyWorkers()
case WorkerRequestsWork(worker) =>
log.info("Worker requests work: {}", worker)
if (workers.contains(worker)) {
if (workQueue.isEmpty)
worker ! NoWorkToBeDone
else if (workers(worker) == None) {
val (workSender, work) = workQueue.dequeue()
workers += (worker -> Some(workSender -> work))
// Use the special form of 'tell' that lets us supply
// the sender
log.info("Sending work {} to {}", worker, work)
worker.tell(WorkToBeDone(work), workSender)
}
}
// Worker has completed its work and we can clear it out
case WorkIsDone(msg, worker) =>
workers.get(worker) match {
case Some((requester, _)) =>
requester ! msg
workers += (worker -> none)
case None =>
log.error("Blurgh! {} said it's done work but we didn't know about him", worker)
}
case Terminated(worker) =>
if (workers.contains(worker) && workers(worker) != None) {
log.error("Blurgh! {} died while processing {}", worker, workers(worker))
// Send the work that it was doing back to ourselves for processing
val (workSender, work) = workers(worker).get
self.tell(work, workSender)
}
workers -= worker
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with CollectionJobMessage[_, J] =>
log.info("Queueing {} items", trigger.items.size)
trigger.toWorkUnits.foreach { work =>
workQueue.enqueue(sender -> work)
}
notifyWorkers()
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with ItemJobMessage[_, U] =>
log.info("Queuing single item")
trigger.toWorkUnits.foreach { work =>
workQueue.enqueue(sender -> work)
}
notifyWorkers()
case work: U =>
workQueue enqueue (sender -> work)
notifyWorkers()
}
}
// Work Request Messages
// Messages from Workers
case class WorkerCreated(worker: ActorRef)
case class WorkerRequestsWork(worker: ActorRef)
case class WorkIsDone[M <: JobMessage[M]](msg: JobResponse[M], worker: ActorRef)
// Messages to Workers
case class WorkToBeDone(work: WorkUnit)
case object WorkIsReady
case object NoWorkToBeDone
trait WorkUnit {
def failed(reason: String): JobFailed[_]
}
// Job messages
trait JobMessage[T <: JobMessage[T]] {
val info: JobInfo
}
trait JobInfo {
val role: String
val command: String
val jobId: Long
val uuid: UUID
}
trait JobTrigger[M <: JobMessage[M], A <: JobAcknowledged[M], F <: JobFailed[M], U <: WorkUnit] extends JobMessage[M] {
def failed(reason: String): F
def acknowledged: A
}
trait JobResponse[M <: JobMessage[M]] extends JobMessage[M]
trait JobFailed[M <: JobMessage[M]] extends JobResponse[M]
trait JobAcknowledged[M <: JobMessage[M]] extends JobResponse[M]
trait JobCompleted[M <: JobMessage[M]] extends JobResponse[M]
package com.goconspire.commons
import scala.reflect.ClassTag
import scala.reflect.classTag
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import com.goconspire.commons.messages._
abstract class Node[Processor <: Actor : ClassTag, M <: JobMessage[M]](serviceName: String) extends Actor with ActorLogging {
val facade = context.actorSelection("/user/facade")
def leaderMsg(msg: Any) = NotifyLeader(serviceName, msg)
def props: Props = Props(classTag[Processor].runtimeClass, self)
override def preStart = {
facade ! leaderMsg(WorkerCreated(self))
}
def working(work: Any): Receive = {
case WorkIsReady => // do nothing
case NoWorkToBeDone => // do nothing
case WorkToBeDone =>
log.error("Node busy")
case sa: JobAcknowledged[M] =>
facade ! leaderMsg(sa)
case ca: JobCompleted[M] =>
facade ! leaderMsg(WorkIsDone(ca, self))
facade ! leaderMsg(WorkerRequestsWork(self))
context.stop(sender)
context.become(idle)
case fa: JobFailed[M] =>
facade ! leaderMsg(WorkIsDone(fa, self))
facade ! leaderMsg(WorkerRequestsWork(self))
context.stop(sender)
context.become(idle)
}
def idle: Receive = {
case WorkIsReady =>
log.info("Requesting work")
facade ! leaderMsg(WorkerRequestsWork(self))
case WorkToBeDone(work) =>
log.info("Got work {}", work)
val processor = context.actorOf(props)
processor ! work
context.become(working(work))
case NoWorkToBeDone => // do nothing
}
def receive = idle
}
@oxlade39

This comment has been minimized.

Copy link

commented Jan 28, 2014

What's the purpose of the supervisor in the Leader? Unless I've missed something it doesn't appear to be used.

@smilingleo

This comment has been minimized.

Copy link

commented Apr 3, 2015

in Leader.scala, line 66:
should be

workers.get(worker) match {
  case Some(Some((requester, _))) => 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.