Skip to content

Instantly share code, notes, and snippets.

@ponkotuy
Last active December 20, 2015 04:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ponkotuy/6069151 to your computer and use it in GitHub Desktop.
Save ponkotuy/6069151 to your computer and use it in GitHub Desktop.
Akka Actor Sample
name := "AkkaSample"
organization := "jp.co.datasection"
scalaVersion := "2.10.2"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.2.0",
"com.typesafe.akka" %% "akka-contrib" % "2.2.0"
)
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)
scalacOptions ++= Seq(
"-deprecation",
"-unchecked",
"-feature"
)
import akka.actor._
import scala.collection.mutable.ArrayBuffer
object Reaper {
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
class ShutdownReaper extends Reaper {
// Shutdown
def allSoulsReaped() {
context.system.shutdown()
println("Shutdown from Reaper")
}
}
import scala.concurrent.duration._
import akka.actor._
object ReaperSample {
class Printer(header: String) extends Actor {
context.setReceiveTimeout(30.milliseconds)
def receive = {
case ReceiveTimeout => context.stop(self)
case x =>
println(header + x)
Thread.sleep(1000)
}
}
def main(args: Array[String]) {
val system = ActorSystem()
val printerA = system.actorOf(Props(new Printer("a: ")))
val printerB = system.actorOf(Props(new Printer("b: ")))
val reaper = system.actorOf(Props[ShutdownReaper])
reaper ! Reaper.WatchMe(printerA)
reaper ! Reaper.WatchMe(printerB)
printerA ! 1
printerB ! 2
printerB ! 3
printerA ! 4
printerA ! 5
// Terminate when output "a: 5"
}
}
import scala.concurrent.duration._
import akka.actor._
import akka.routing._
object RouterSample {
class Printer extends Actor {
context.setReceiveTimeout(30.milliseconds)
def receive= {
case ReceiveTimeout => context.stop(self)
case x =>
Thread.sleep(1000) // Should not block normally
println(x)
}
}
def main(args: Array[String]) {
val system = ActorSystem("sample")
val router = system.actorOf(Props[Printer].withRouter(RoundRobinRouter(4)))
val reaper = system.actorOf(Props[ShutdownReaper])
reaper ! Reaper.WatchMe(router)
(1 to 10) foreach { router ! _ }
}
}
import scala.concurrent.duration._
import akka.actor._
import akka.contrib.throttle._
import akka.contrib.throttle.Throttler._
object ThrottlerSample {
class Printer extends Actor {
context.setReceiveTimeout(1030.milliseconds) // Require more 1sec for throttler
def receive = {
case ReceiveTimeout => context.stop(self)
case x => println(x)
}
}
def main(args: Array[String]) {
val system = ActorSystem("sample")
val printer = system.actorOf(Props[Printer])
val throttler = system.actorOf(
Props(new TimerBasedThrottler(3 msgsPer (1.seconds)))
)
throttler ! SetTarget(Some(printer))
// Reaper Settings
val reaper = system.actorOf(Props[ShutdownReaper])
reaper ! Reaper.WatchMe(printer)
// These three messages will be sent to the printer immediately
throttler ! "1"
throttler ! "2"
throttler ! "3"
// These two will wait at least until 1 second has passed
throttler ! "4"
throttler ! "5"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment