Skip to content

Instantly share code, notes, and snippets.

@yoeluk
Last active August 29, 2015 14:18
Show Gist options
  • Save yoeluk/5d990ee5e40ded56ccd3 to your computer and use it in GitHub Desktop.
Save yoeluk/5d990ee5e40ded56ccd3 to your computer and use it in GitHub Desktop.
a queue manager for submitting jobs containing parallel computations
package extra.queuemanager
import akka.actor._
import scala.collection.mutable
import scala.collection.immutable
import scala.util.Failure
/**
* Created by yoelusa on 31/03/15.
*/
object Manager {
sealed trait Work[+A]
case class Job[+A](document: A, worker: ActorRef) extends Work[A]
object Work {
def canProcessJob[A](work: Work[A])(f: Tester[A]): Boolean = work match {
case job: Job[A] => canProcessJobs(immutable.HashMap((job.document, job)))(f)
case _ => false
}
def canProcessJobs[A](currentJobs: immutable.HashMap[A, Work[A]])(f: Tester[A]): Boolean =
f(currentJobs)
}
type Tester[A] = immutable.HashMap[A, Work[A]] => Boolean
case class Task[A](work: Work[A], requester: ActorRef)
case class JobDone[A](document: A, result: Any)
case class WorkResult(result: Any, requester: ActorRef)
implicit def tasksToJobs[A]: mutable.Map[A, Task[A]] => immutable.HashMap[A, Work[A]] =
_.foldLeft(immutable.HashMap.empty[A, Work[A]])((ac, m) => ac + ((m._1, m._2.work)))
abstract class AbstractManager[T](f: Tester[T]) extends Actor with ActorLogging {
import Work._
def receive = {
case work: Job[T] => processWork(work, sender())
case done: JobDone[T] =>
ongoingJobs.get(done.document) match {
case Some(task) =>
task.requester ! done.result
ongoingJobs -= done.document
processQueue()
case _ =>
}
}
private[queuemanager] final val queuedJobs = mutable.HashMap.empty[T, Task[T]]
private[queuemanager] final val ongoingJobs = mutable.HashMap.empty[T, Task[T]]
private[queuemanager] final def processWork(work: Job[T], captSender: ActorRef) = canProcessJob(work)(f) match {
case true => canProcessJobs(ongoingJobs + ((work.document, Task(work, captSender))))(f) match {
case true =>
val task = Task(work, captSender)
ongoingJobs += ((work.document, task))
work.worker ! work.document
case _ => queuedJobs += ((work.document, Task(work, captSender)))
}
case _ => captSender ! Failure(exception = new Throwable("The job is too large! "))
}
private[queuemanager] final def processQueue() = queuedJobs.headOption match {
case Some(kv) => canProcessJobs(ongoingJobs + kv)(f) match {
case true =>
val task = queuedJobs.remove(kv._1).get
task.work match {
case work: Job[T] =>
ongoingJobs += ((work.document, task))
work.worker ! work.document
case _ =>
}
case _ =>
}
case _ =>
}
}
}
package com.popeyepdf
import scala.concurrent.duration._
import scala.collection.immutable
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.apache.pdfbox.pdmodel._
import extra.queuemanager.Manager._
import akka.pattern.{ask, pipe}
/**
* Created by yoelusa on 24/03/15.
*/
case class JobRequest(request: ParseRequest, requester: ActorRef)
case class JobPackage(document: PDDocument, requester: ActorRef)
class PopeyeParser extends Actor with ActorLogging {
import context._
import ManagerInitializer._
implicit val timeout = Timeout(60 seconds)
val workManager = context.actorOf(Props(new ManagerActor), name = "manager")
def receive = {
case request: ParseRequest =>
val jobDispatcher = createDispatcher
jobDispatcher ! JobRequest(request, sender())
case pack: JobPackage =>
val parser = createParser
val job = Job(pack.document, parser)
workManager ? job pipeTo pack.requester
case _ =>
sender() ! FailedResult(failedMsg = "Unknown request!")
}
def createDispatcher = {
context.actorOf(Props(new JobDispatcher))
}
def createParser = {
context.actorOf(Props(new Parser))
}
}
object ManagerInitializer {
class ManagerActor extends AbstractManager(eval compose pages) {}
val config = ConfigFactory.load("application")
val parOps = config.getString("running.parOps").toInt
def eval: Int => Boolean = {case p => parOps >= p}
def pages: immutable.HashMap[PDDocument, Work[PDDocument]] => Int =
_.foldLeft(0)((ac, w) => w match {
case (j,_) => j.getNumberOfPages + ac
case _ => ac
})
}
// Dispatcher and Parser (worker) actor implementations are not shown here!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment