name := "ScalaInHachioji"
version := "1.0"
scalaVersion := "2.10.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies +=
"com.typesafe.akka" %% "akka-actor" % "2.3.1"
- org.hachiojipm.sandbox パッケージ作る
- エントリポイントとなる AkkaTutor オブジェクト作る
package org.hachipjipm.sandbox
object AkkaTutor extends App{
println("hello akka")
}
- Actor クラスを作る
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
case object Hello
class HelloActor extends Actor{
val logger = Logging(context.system, this)
def receive: Receive = {
case Hello =>
logger.info("received hello")
}
}
ちなみに Receive はPartialFunction[Any, Unit]
のエイリアス
- エントリポイントを修正
package org.hachipjipm.sandbox
import akka.actor.ActorSystem
import akka.actor.Props
object AkkaTutor extends App{
val system = ActorSystem.create("akka-tutor")
// actor のインスタンスそのものじゃなくて、一枚 wrap されたActorRefが返ってくる
val helloActor = system.actorOf(Props[HelloActor])
helloActor ! Hello
}
* 実行してみる
ログが表示されたのが見れると思います。こうやって、アクターを作ってアクターにメッセージを送るみたいなやりかたで動く。
- HelloActor を WaitingHelloActor にする
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
case class WaitThenSay(sleep: Int, message: String)
class WaitingHelloActor extends Actor{
val logger = Logging(context.system, this)
def receive: Receive = {
case WaitThenSay(sleep, message) =>
Thread.sleep(sleep)
logger.info("received " + message)
}
}
ちなみにThread.sleep でブロッキングしちゃってるけど、ブロッキングするようなタスクについては注意点もあるので Actor Systems — Akka Documentation の「Blocking Needs Careful Management」の節を参照のこと。
- エントリポイントを修正
package org.hachipjipm.sandbox
import akka.actor.ActorSystem
import akka.actor.Props
object AkkaTutor extends App{
val system = ActorSystem.create("akka-tutor")
val actorA = system.actorOf(Props[WaitingHelloActor])
val actorB = system.actorOf(Props[WaitingHelloActor])
actorA ! WaitThenSay(2000, "i'm actorA")
actorB ! WaitThenSay(1000, "i'm actorB")
}
package org.hachipjipm.sandbox
import akka.actor.ActorSystem
import akka.actor.Props
object AkkaTutor extends App{
val system = ActorSystem.create("akka-tutor")
val actorA = system.actorOf(Props[WaitingHelloActor])
actorA ! WaitThenSay(2000, "first message")
println("sent first message")
actorA ! WaitThenSay(1000, "second message")
println("sent second message")
}
- アクターは同時にひとつのメッセージしか処理しない。
- メッセージは mailbox というところに届いていて、そこに届いたメッセージを順に処理していく。
- メッセージ到達順序や実行順序の保証について、詳しくは Message Delivery Reliability — Akka Documentation を参照のこと
package org.hachipjipm.sandbox
import akka.actor.{Props, Actor}
import akka.event.Logging
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
case object Start
case object Ping
case object Pong
class ActorA extends Actor {
val logger = Logging(context.system, self)
val b = context.actorOf(Props[ActorB])
def receive: Receive = {
case Start =>
implicit val timeout = Timeout(5 second)
implicit val dispatcher = context.dispatcher
val response= b ? Ping
response.foreach(message => logger.info(message.toString))
}
}
class ActorB extends Actor {
val logger = Logging(context.system, self)
def receive: Receive = {
case Ping =>
logger.info(Ping.toString)
// sender() でメッセージを送ってきたアクターを取得できる
sender() ! Pong
}
}
package org.hachipjipm.sandbox
import akka.actor.ActorSystem
import akka.actor.Props
object AkkaTutor extends App{
val system = ActorSystem.create("akka-tutor")
val a = system.actorOf(Props[ActorA], "root")
a ! Start
}
非同期でめんどうなことのうちのひとつに例外処理がありますね。例外はどう扱われるのか
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
object ReportActor {
case class Say(message: String)
}
class ReportActor extends Actor{
import ReportActor._
val logger = Logging(context.system, this)
def receive: Receive = {
case Say(message) =>
logger.info(message)
case _ =>
throw new Exception("i don't know how to handle message")
}
}
package org.hachipjipm.sandbox
import akka.actor.{Props, Actor}
import akka.event.Logging
object RootActor {
case object Run
}
class RootActor extends Actor{
import RootActor._
val logger = Logging(context.system, this)
def receive : Receive = {
case Run =>
val actor = context.actorOf(Props[ReportActor])
actor ! ReportActor.Say("にゃん")
actor ! "にゃん"
actor ! ReportActor.Say("にゃん")
}
}
「!?」壊れたはずのアクターが生きてる!!!!!これを説明します。
ここ の Actor Lifecycle を読むとくわしいことが書いてある。わたしのことばでまとめ直すと、
- Actor は一度作られたら明示的に止める(方法は後述)しないと止まらない。
- 例外吐いても止まらない。デフォルトの挙動では restart される。
- restart されると、新しいactor instanceが作られて preRestart が呼ばれる。
- 例外吐いた actor は、この新しい instance で置き換えられる
- postRestart 呼ばれる
- この仕組みのために、actorがActorRefにwrapされてるわけですね。賢い。
実際に確かめてみよう
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
object ReportActor {
case class Say(message: String)
}
class ReportActor extends Actor{
import ReportActor._
val logger = Logging(context.system, this)
logger.info("initializing actor...")
override def preStart = {
logger.info("preStart is called")
}
override def preRestart(reason:Throwable, message:Option[Any]) = {
// reason に、restart する原因になった例外が入ってくる
// message の処理中に restart した場合その message が入ってくる
super.preRestart(reason, message)
logger.info("preRestart is called")
}
override def postRestart(reason:Throwable) = {
// reason に、restart する原因になった例外が入ってくる
super.postRestart(reason)
logger.info("postRestart is called")
}
def receive: Receive = {
case Say(message) =>
logger.info(message)
case _ =>
throw new Exception("i don't know how to handle message")
}
}
- PoisonPill を送る
package org.hachipjipm.sandbox
import akka.actor.{PoisonPill, Terminated, Props, Actor}
import akka.event.Logging
object RootActor {
case object Run
}
class RootActor extends Actor{
import RootActor._
val logger = Logging(context.system, this)
def receive : Receive = {
case Run =>
val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる
actor ! ReportActor.Say("にゃん")
actor ! PoisonPill
actor ! ReportActor.Say("にゃん")
}
}
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
object ReportActor {
case class Say(message: String)
}
class ReportActor extends Actor{
import ReportActor._
val logger = Logging(context.system, this)
def receive: Receive = {
case Say(message) =>
logger.info(message)
case _ =>
throw new Exception("i don't know how to handle message")
}
}
- 自分で止まる
package org.hachipjipm.sandbox
import akka.actor.{PoisonPill, Terminated, Props, Actor}
import akka.event.Logging
object RootActor {
case object Run
}
class RootActor extends Actor{
import RootActor._
val logger = Logging(context.system, this)
def receive : Receive = {
case Run =>
val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる
actor ! ReportActor.Say("にゃん")
actor ! ReportActor.Say("にゃん")
}
}
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
object ReportActor {
case class Say(message: String)
}
class ReportActor extends Actor{
import ReportActor._
val logger = Logging(context.system, this)
def receive: Receive = {
case Say(message) =>
logger.info(message)
context.stop(self) //self は自分のActorRefを指す
case _ =>
throw new Exception("i don't know how to handle message")
}
}
Actorを作るときには ActorSystem か ActorContext に対して actorOf を呼んで作る。このとき、
- ActorSystem#actorOf で作られたアクターはユーザー空間のトップレベルのアクターになる。
- ActorContext#actorOf で作られたアクターは、その context を保持するActorによって監視される
監視してるマンは、監視されてるマンの例外を捕捉して、どうハンドルするかを決められる
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging
object ReportActor {
case class Say(message: String)
class CantHandleMessage(message:String) extends Exception(message:String)
class Drunk(message:String) extends Exception(message:String)
}
class ReportActor extends Actor{
import ReportActor._
val logger = Logging(context.system, this)
var drunk = 0
def receive: Receive = {
case Say(message) =>
logger.info(message)
case "beer" =>
drunk += 1
if ( drunk >= 10 ) {
throw new Drunk("びーるのんでつかいものにならなくなっちゃった")
}
case _ =>
throw new CantHandleMessage("i don't know such a message")
}
}
package org.hachipjipm.sandbox
import akka.actor._
import akka.event.Logging
import scala.concurrent.duration._
object RootActor {
case object Run
}
class RootActor extends Actor{
import RootActor._
val logger = Logging(context.system, this)
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minutes) {
case _ : ReportActor.CantHandleMessage => SupervisorStrategy.Restart
case _ : ReportActor.Drunk => SupervisorStrategy.Stop
}
def receive : Receive = {
case Run =>
val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる
actor ! ReportActor.Say("にゃん")
actor ! "にゃん"
for { i <- 1.to(10)} {
actor ! "beer"
}
actor ! ReportActor.Say("にゃん")
}
}
さらに詳しいことはFault Tolerance — Akka Documentationを読むと書いてある
- TypedActor ってのもある see Typed Actors
- Actor同士はネットワーク上で連携可能である、つまり、複数のJVMでクラスタを組むことが可能である see Networking。このへんはまだわたしも学習中。
- Actor はそれぞれ Path を持っていて、その path で identify 可能である see Actor References, Paths and Addresses — Akka Documentation