Last active
August 29, 2015 14:18
-
-
Save yoeluk/5d990ee5e40ded56ccd3 to your computer and use it in GitHub Desktop.
a queue manager for submitting jobs containing parallel computations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package extra.queuemanager | |
import akka.actor._ | |
import scala.collection.mutable | |
import scala.collection.immutable | |
import scala.util.Failure | |
/** | |
* Created by yoelusa on 31/03/15. | |
*/ | |
object Manager { | |
sealed trait Work[+A] | |
case class Job[+A](document: A, worker: ActorRef) extends Work[A] | |
object Work { | |
def canProcessJob[A](work: Work[A])(f: Tester[A]): Boolean = work match { | |
case job: Job[A] => canProcessJobs(immutable.HashMap((job.document, job)))(f) | |
case _ => false | |
} | |
def canProcessJobs[A](currentJobs: immutable.HashMap[A, Work[A]])(f: Tester[A]): Boolean = | |
f(currentJobs) | |
} | |
type Tester[A] = immutable.HashMap[A, Work[A]] => Boolean | |
case class Task[A](work: Work[A], requester: ActorRef) | |
case class JobDone[A](document: A, result: Any) | |
case class WorkResult(result: Any, requester: ActorRef) | |
implicit def tasksToJobs[A]: mutable.Map[A, Task[A]] => immutable.HashMap[A, Work[A]] = | |
_.foldLeft(immutable.HashMap.empty[A, Work[A]])((ac, m) => ac + ((m._1, m._2.work))) | |
abstract class AbstractManager[T](f: Tester[T]) extends Actor with ActorLogging { | |
import Work._ | |
def receive = { | |
case work: Job[T] => processWork(work, sender()) | |
case done: JobDone[T] => | |
ongoingJobs.get(done.document) match { | |
case Some(task) => | |
task.requester ! done.result | |
ongoingJobs -= done.document | |
processQueue() | |
case _ => | |
} | |
} | |
private[queuemanager] final val queuedJobs = mutable.HashMap.empty[T, Task[T]] | |
private[queuemanager] final val ongoingJobs = mutable.HashMap.empty[T, Task[T]] | |
private[queuemanager] final def processWork(work: Job[T], captSender: ActorRef) = canProcessJob(work)(f) match { | |
case true => canProcessJobs(ongoingJobs + ((work.document, Task(work, captSender))))(f) match { | |
case true => | |
val task = Task(work, captSender) | |
ongoingJobs += ((work.document, task)) | |
work.worker ! work.document | |
case _ => queuedJobs += ((work.document, Task(work, captSender))) | |
} | |
case _ => captSender ! Failure(exception = new Throwable("The job is too large! ")) | |
} | |
private[queuemanager] final def processQueue() = queuedJobs.headOption match { | |
case Some(kv) => canProcessJobs(ongoingJobs + kv)(f) match { | |
case true => | |
val task = queuedJobs.remove(kv._1).get | |
task.work match { | |
case work: Job[T] => | |
ongoingJobs += ((work.document, task)) | |
work.worker ! work.document | |
case _ => | |
} | |
case _ => | |
} | |
case _ => | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.popeyepdf | |
import scala.concurrent.duration._ | |
import scala.collection.immutable | |
import akka.util.Timeout | |
import com.typesafe.config.ConfigFactory | |
import org.apache.pdfbox.pdmodel._ | |
import extra.queuemanager.Manager._ | |
import akka.pattern.{ask, pipe} | |
/** | |
* Created by yoelusa on 24/03/15. | |
*/ | |
case class JobRequest(request: ParseRequest, requester: ActorRef) | |
case class JobPackage(document: PDDocument, requester: ActorRef) | |
class PopeyeParser extends Actor with ActorLogging { | |
import context._ | |
import ManagerInitializer._ | |
implicit val timeout = Timeout(60 seconds) | |
val workManager = context.actorOf(Props(new ManagerActor), name = "manager") | |
def receive = { | |
case request: ParseRequest => | |
val jobDispatcher = createDispatcher | |
jobDispatcher ! JobRequest(request, sender()) | |
case pack: JobPackage => | |
val parser = createParser | |
val job = Job(pack.document, parser) | |
workManager ? job pipeTo pack.requester | |
case _ => | |
sender() ! FailedResult(failedMsg = "Unknown request!") | |
} | |
def createDispatcher = { | |
context.actorOf(Props(new JobDispatcher)) | |
} | |
def createParser = { | |
context.actorOf(Props(new Parser)) | |
} | |
} | |
object ManagerInitializer { | |
class ManagerActor extends AbstractManager(eval compose pages) {} | |
val config = ConfigFactory.load("application") | |
val parOps = config.getString("running.parOps").toInt | |
def eval: Int => Boolean = {case p => parOps >= p} | |
def pages: immutable.HashMap[PDDocument, Work[PDDocument]] => Int = | |
_.foldLeft(0)((ac, w) => w match { | |
case (j,_) => j.getNumberOfPages + ac | |
case _ => ac | |
}) | |
} | |
// Dispatcher and Parser (worker) actor implementations are not shown here! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment