Skip to content

Instantly share code, notes, and snippets.

@kirked
Created October 23, 2015 23:44
Show Gist options
  • Save kirked/a81ae4b2e52d8b89a9cc to your computer and use it in GitHub Desktop.
Save kirked/a81ae4b2e52d8b89a9cc to your computer and use it in GitHub Desktop.
Akka worker pool; backpressure configurable via dispatcher configuration.
package actors
import akka.actor._
import akka.pattern.ask
import scala.collection.immutable.Queue
import scala.concurrent.duration._
import scala.util.{Success, Failure}
object WorkerPool {
val defaultTimeout = 30.seconds
/** requests for worker pool; response will be worker response to `msg`. */
case class Request(msg: Any, expectResponse: Boolean = true, timeout: Option[FiniteDuration] = Some(defaultTimeout))
def newPool(workerProps: Props, workerCount: Int): Props =
WorkRequesting.WorkManager.props(workerProps, workerCount)
}
object WorkRequesting {
private object PrivateMessages {
case object GimmeWork
case class WorkRequest(msg: Any, requestor: ActorRef, expectResponse: Boolean, timeout: FiniteDuration)
}
object WorkManager extends ActorGenerator2[WorkManager, Props, Int]
class WorkManager(workerProps: Props, workerCount: Int)
extends Actor
with ActorLogging
with ErrorLogging {
import PrivateMessages._
import WorkerPool._
private var workers: Set[ActorRef] = Seq.fill(workerCount)(makeWorker).toSet
private var workQueue = Queue.empty[WorkRequest]
def receive = {
case GimmeWork =>
log.debug("GimmeWork from {}", sender)
workers = workers + sender
routeWork
case Terminated(actor) if workers contains actor =>
log.debug("{} TERMINATED", sender)
workers = workers - actor + makeWorker
routeWork
case Request(msg, responseExpected, timeout) =>
log.debug("REQUEST ({}, {}, {}) from {}", msg, responseExpected, timeout, sender)
workQueue = workQueue.enqueue(WorkRequest(msg, sender, responseExpected, timeout getOrElse defaultTimeout))
routeWork
case msg if !workers.contains(sender) =>
log.debug("RAW {} from {}", msg, sender)
workQueue = workQueue.enqueue(WorkRequest(msg, sender, true, defaultTimeout))
routeWork
}
private def routeWork: Unit = {
while (workers.nonEmpty && workQueue.nonEmpty) {
val worker = workers.head
workers = workers.tail
val (work, updatedQueue) = workQueue.dequeue
workQueue = updatedQueue
worker ! work
}
}
private def makeWorker: ActorRef = context.actorOf(Worker.props(self, workerProps))
}
object Worker extends ActorGenerator2[Worker, ActorRef, Props]
class Worker(manager: ActorRef, workerProps: Props)
extends Actor
with ActorLogging
with ErrorLogging {
import PrivateMessages._
var worker = context.actorOf(workerProps)
def receive = {
case WorkRequest(msg, requestor, true, timeout) =>
log.debug("WorkRequest({}, {}, true, {})", msg, requestor, timeout)
import context.dispatcher
worker.ask(msg)(akka.util.Timeout(timeout)) andThen {
case Success(result) =>
requestor ! result
manager ! GimmeWork
case Failure(result) =>
requestor ! result
manager ! GimmeWork
}
case WorkRequest(msg, requestor, _, _) =>
log.debug("WorkRequest({}, {}, false)", msg, requestor)
worker.tell(msg, requestor)
manager ! GimmeWork
case Terminated(actor) if actor == worker =>
log.info("worker {} TERMINATED; creating new", actor)
worker = context.actorOf(workerProps)
case response if sender == worker =>
log.debug("asking for work")
manager ! GimmeWork
}
}
}
@kirked
Copy link
Author

kirked commented Oct 24, 2015

For example, a bounded pool of image processors, so as not to overload a machine.

Pool Creation

val imageProcessor: ActorRef = ActorSystem.actorOf(newPool(ImageProcessor.props, 5), "image-processor")

Concurrency Control

Fine-grained concurrency control is provided by configuring the Akka deployment for the actor, as well as the number of workers in the pool. The above example specifies 5 workers, so we could create a separate dispatcher for the pool:

image-processing-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  throughput = 10
  thread-pool-executor {
    core-pool-size-min = 2
    core-pool-size-max = 6
    core-pool-size-factor = 1.0
  }
}

akka.actor.deployment {
  "/image-processor/*" {
    dispatcher = image-processing-dispatcher
  }
}

Messaging

Any Message

Sending a non-pool message to the pool forwards it as a request expecting a response with a timeout of 30 seconds.

imageProcessor ! Process(imageFile1)

Pool Request Message

For more control, send a WorkerPool.Request message, which provides the ability to send a message where a response is not expected, and a user-specified timeout.

imageProcessor ! WorkerPool.Request(Process(imageFile2), true, 5.seconds)
imageProcessor ! WorkerPool.Request(Delete(imageFile1), false)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment