Skip to content

Instantly share code, notes, and snippets.

@Shinpeim

Shinpeim/actor.md Secret

Last active Aug 29, 2015
Embed
What would you like to do?

はじめてのActor

name := "ScalaInHachioji"

version := "1.0"

scalaVersion := "2.10.4"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies +=
  "com.typesafe.akka" %% "akka-actor" % "2.3.1"
  • org.hachiojipm.sandbox パッケージ作る
  • エントリポイントとなる AkkaTutor オブジェクト作る
package org.hachipjipm.sandbox

object AkkaTutor extends App{
  println("hello akka")
}
  • Actor クラスを作る
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging

case object Hello

class HelloActor extends Actor{
  val logger = Logging(context.system, this)

  def receive: Receive = {
    case Hello =>
      logger.info("received hello")
  }
}

ちなみに Receive はPartialFunction[Any, Unit]のエイリアス

  • エントリポイントを修正
package org.hachipjipm.sandbox

import akka.actor.ActorSystem
import akka.actor.Props

object AkkaTutor extends App{
  val system = ActorSystem.create("akka-tutor")
  
  // actor のインスタンスそのものじゃなくて、一枚 wrap されたActorRefが返ってくる
  val helloActor = system.actorOf(Props[HelloActor])
  helloActor ! Hello
}

* 実行してみる

ログが表示されたのが見れると思います。こうやって、アクターを作ってアクターにメッセージを送るみたいなやりかたで動く。

非同期なんやで

  • HelloActor を WaitingHelloActor にする
package org.hachipjipm.sandbox
import akka.actor.Actor
import akka.event.Logging

case class WaitThenSay(sleep: Int, message: String)

class WaitingHelloActor extends Actor{
  val logger = Logging(context.system, this)

  def receive: Receive = {
    case WaitThenSay(sleep, message) =>
      Thread.sleep(sleep)

      logger.info("received " + message)
  }
}

ちなみにThread.sleep でブロッキングしちゃってるけど、ブロッキングするようなタスクについては注意点もあるので Actor Systems — Akka Documentation の「Blocking Needs Careful Management」の節を参照のこと。

  • エントリポイントを修正
package org.hachipjipm.sandbox

import akka.actor.ActorSystem
import akka.actor.Props

object AkkaTutor extends App{
  val system = ActorSystem.create("akka-tutor")

  val actorA = system.actorOf(Props[WaitingHelloActor])
  val actorB = system.actorOf(Props[WaitingHelloActor])

  actorA ! WaitThenSay(2000, "i'm actorA")
  actorB ! WaitThenSay(1000, "i'm actorB")
}

actor が一個だけだのどうなるのん

package org.hachipjipm.sandbox

import akka.actor.ActorSystem
import akka.actor.Props

object AkkaTutor extends App{
  val system = ActorSystem.create("akka-tutor")

  val actorA = system.actorOf(Props[WaitingHelloActor])

  actorA ! WaitThenSay(2000, "first message")
  println("sent first message")
  actorA ! WaitThenSay(1000, "second message")
  println("sent second message")
}
  • アクターは同時にひとつのメッセージしか処理しない。
  • メッセージは mailbox というところに届いていて、そこに届いたメッセージを順に処理していく。
  • メッセージ到達順序や実行順序の保証について、詳しくは Message Delivery Reliability — Akka Documentation を参照のこと

ask

package org.hachipjipm.sandbox

import akka.actor.{Props, Actor}
import akka.event.Logging
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

case object Start
case object Ping
case object Pong

class ActorA extends Actor {
  val logger = Logging(context.system, self)

  val b = context.actorOf(Props[ActorB])

  def receive: Receive = {
    case Start =>
      implicit val timeout = Timeout(5 second)
      implicit val dispatcher = context.dispatcher

      val response= b ? Ping
      response.foreach(message => logger.info(message.toString))
  }

}

class ActorB extends Actor {
  val logger = Logging(context.system, self)

  def receive: Receive = {
    case Ping =>
      logger.info(Ping.toString)

      // sender() でメッセージを送ってきたアクターを取得できる
      sender() ! Pong
  }
}
package org.hachipjipm.sandbox

import akka.actor.ActorSystem
import akka.actor.Props

object AkkaTutor extends App{
  val system = ActorSystem.create("akka-tutor")

  val a = system.actorOf(Props[ActorA], "root")
  a ! Start
}

Let It Cash!

非同期でめんどうなことのうちのひとつに例外処理がありますね。例外はどう扱われるのか

package org.hachipjipm.sandbox

import akka.actor.Actor
import akka.event.Logging

object ReportActor {
  case class Say(message: String)
}
class ReportActor extends Actor{
  import ReportActor._
  val logger = Logging(context.system, this)

  def receive: Receive = {
    case Say(message) =>
      logger.info(message)

    case _ =>
      throw new Exception("i don't know how to handle message")
  }
}
package org.hachipjipm.sandbox

import akka.actor.{Props, Actor}
import akka.event.Logging

object RootActor {
  case object Run
}

class RootActor extends Actor{
  import RootActor._

  val logger = Logging(context.system, this)

  def receive : Receive = {
    case Run =>
      val actor = context.actorOf(Props[ReportActor])
      actor ! ReportActor.Say("にゃん")
      actor ! "にゃん"
      actor ! ReportActor.Say("にゃん")
  }
}

「!?」壊れたはずのアクターが生きてる!!!!!これを説明します。

アクターのライフサイクル

ここ の Actor Lifecycle を読むとくわしいことが書いてある。わたしのことばでまとめ直すと、

  • Actor は一度作られたら明示的に止める(方法は後述)しないと止まらない。
    • 例外吐いても止まらない。デフォルトの挙動では restart される。
    • restart されると、新しいactor instanceが作られて preRestart が呼ばれる。
    • 例外吐いた actor は、この新しい instance で置き換えられる
    • postRestart 呼ばれる
      • この仕組みのために、actorがActorRefにwrapされてるわけですね。賢い。

実際に確かめてみよう

package org.hachipjipm.sandbox

import akka.actor.Actor
import akka.event.Logging

object ReportActor {
  case class Say(message: String)
}
class ReportActor extends Actor{
  import ReportActor._
  val logger = Logging(context.system, this)

  logger.info("initializing actor...")

  override def preStart = {
    logger.info("preStart is called")
  }

  override def preRestart(reason:Throwable, message:Option[Any]) = {
    // reason に、restart する原因になった例外が入ってくる
    // message の処理中に restart した場合その message が入ってくる
    super.preRestart(reason, message)
    logger.info("preRestart is called")
  }

  override def postRestart(reason:Throwable) = {
    // reason に、restart する原因になった例外が入ってくる
    super.postRestart(reason)
    logger.info("postRestart is called")
  }

  def receive: Receive = {
    case Say(message) =>
      logger.info(message)

    case _ =>
      throw new Exception("i don't know how to handle message")
  }
}

アクター止める方法

  • PoisonPill を送る
package org.hachipjipm.sandbox

import akka.actor.{PoisonPill, Terminated, Props, Actor}
import akka.event.Logging

object RootActor {
  case object Run
}
class RootActor extends Actor{
  import RootActor._

  val logger = Logging(context.system, this)

  def receive : Receive = {
    case Run =>
      val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる

      actor ! ReportActor.Say("にゃん")
      actor ! PoisonPill
      actor ! ReportActor.Say("にゃん")
  }
}
package org.hachipjipm.sandbox

import akka.actor.Actor
import akka.event.Logging

object ReportActor {
  case class Say(message: String)
}
class ReportActor extends Actor{
  import ReportActor._
  val logger = Logging(context.system, this)

  def receive: Receive = {
    case Say(message) =>
      logger.info(message)

    case _ =>
      throw new Exception("i don't know how to handle message")
  }
}
  • 自分で止まる
package org.hachipjipm.sandbox

import akka.actor.{PoisonPill, Terminated, Props, Actor}
import akka.event.Logging

object RootActor {
  case object Run
}
class RootActor extends Actor{
  import RootActor._

  val logger = Logging(context.system, this)

  def receive : Receive = {
    case Run =>
      val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる

      actor ! ReportActor.Say("にゃん")
      actor ! ReportActor.Say("にゃん")
  }
}
package org.hachipjipm.sandbox

import akka.actor.Actor
import akka.event.Logging

object ReportActor {
  case class Say(message: String)
}
class ReportActor extends Actor{
  import ReportActor._
  val logger = Logging(context.system, this)

  def receive: Receive = {
    case Say(message) =>
      logger.info(message)
      context.stop(self) //self は自分のActorRefを指す

    case _ =>
      throw new Exception("i don't know how to handle message")
  }
}

スーパーバイザーツリー

Actorを作るときには ActorSystem か ActorContext に対して actorOf を呼んで作る。このとき、

  • ActorSystem#actorOf で作られたアクターはユーザー空間のトップレベルのアクターになる。
  • ActorContext#actorOf で作られたアクターは、その context を保持するActorによって監視される

監視してるマンは、監視されてるマンの例外を捕捉して、どうハンドルするかを決められる

package org.hachipjipm.sandbox

import akka.actor.Actor
import akka.event.Logging

object ReportActor {
  case class Say(message: String)
  class CantHandleMessage(message:String) extends Exception(message:String)
  class Drunk(message:String) extends Exception(message:String)
}
class ReportActor extends Actor{
  import ReportActor._
  val logger = Logging(context.system, this)

  var drunk = 0

  def receive: Receive = {
    case Say(message) =>
      logger.info(message)

    case "beer" =>
      drunk += 1
      if (  drunk >= 10 ) {
        throw new Drunk("びーるのんでつかいものにならなくなっちゃった")
      }

    case _ =>
      throw new CantHandleMessage("i don't know such a message")
  }
}
package org.hachipjipm.sandbox

import akka.actor._
import akka.event.Logging
import scala.concurrent.duration._

object RootActor {
  case object Run
}
class RootActor extends Actor{
  import RootActor._

  val logger = Logging(context.system, this)

  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minutes) {
    case _ : ReportActor.CantHandleMessage => SupervisorStrategy.Restart
    case _ : ReportActor.Drunk => SupervisorStrategy.Stop
  }

  def receive : Receive = {
    case Run =>
      val actor = context.actorOf(Props[ReportActor]) // actor の supervisor は RootActorになる

      actor ! ReportActor.Say("にゃん")
      actor ! "にゃん"
      for { i <- 1.to(10)} {
        actor ! "beer"
      }
      actor ! ReportActor.Say("にゃん")
  }
}

さらに詳しいことはFault Tolerance — Akka Documentationを読むと書いてある

落ち穂拾い

はじめてのSpray IO

name := "StreamingChat"

version := "1.0"

scalaVersion := "2.10.4"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

resolvers += "spray repo" at "http://repo.spray.io"

libraryDependencies +=
  "com.typesafe.akka" %% "akka-actor" % "2.3.1"

libraryDependencies += "io.spray" % "spray-can" % "1.3.1"
import akka.actor.{Props, ActorSystem}

object Main extends App{
  val system = ActorSystem.create("system")
  val root = system.actorOf(Props[RootActor])
}
import akka.actor.{Actor, Props}
import akka.io.IO
import spray.can.Http

/**
 * Created by shinpei on 2014/04/06.
 */
class RootActor extends Actor{
  val server = context.system.actorOf(Props[Server])

  IO(Http)(context.system) ! Http.Bind(server, interface = "localhost", port = 8080)

  def receive: Receive = {
    case _ => ()
  }
}

import akka.actor.{Props, Actor}
import akka.event.Logging
import spray.can.Http

class Server extends Actor {
  val logger = Logging(context.system, self)

  def receive: Receive = {
    case Http.Bound(address) =>
      logger.info("bound")

    case Http.Connected(remoteAddress, locaAddress) =>
      val conn = sender
      val handler = context.actorOf(Props[ConnectionHandler])
      conn ! Http.Register(handler)
  }
}
import akka.actor.Actor
import spray.http.{Uri, HttpResponse, HttpRequest}
import spray.http.HttpMethods._

class ConnectionHandler extends Actor {
  def receive: Receive = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      sender ! HttpResponse(entity = "hello world", status = 200)
  }
}

Chunked Responses

import akka.actor.Actor
import spray.http._
import spray.http.ChunkedResponseStart
import spray.http.HttpMethods._
import spray.http.HttpRequest

class ConnectionHandler extends Actor {
  def receive: Receive = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      sender ! ChunkedResponseStart(HttpResponse(status = 200, entity = ""))

      for {i <- 1 to 10} {
        Thread.sleep(1000)
        sender ! MessageChunk(i.toString + "\r\n")
      }

      sender ! ChunkedMessageEnd
  }
}

GET で Streaming, POST で投稿

import akka.actor.{ActorRef, Actor}

object Publisher {
  case object Subscribe
  case object UnSubscribe
  case class Publish(message: String)
}

class Publisher extends Actor{
  import Publisher._
  var subscribers: Set[ActorRef] = Set.empty

  def receive: Receive = {
    case Subscribe =>
      subscribers += sender

    case UnSubscribe =>
      subscribers -= sender

    case Publish(message) =>
      subscribers.foreach(_ ! ConnectionHandler.SendMessage(message))
  }
}
import akka.actor.{Props, ActorRef, Actor}
import akka.event.Logging
import spray.can.Http
import spray.http._
import spray.http.HttpMethods._
import spray.http.HttpRequest

object ConnectionHandler {
  case class SendMessage(message: String)

  def props(publisher:ActorRef): Props = {
    Props(new ConnectionHandler(publisher))
  }
}

class ConnectionHandler(publisher: ActorRef) extends Actor {
  import ConnectionHandler._

  val logger = Logging(context.system, self)

  def receive: Receive = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      val conn = sender

      publisher ! Publisher.Subscribe

      conn ! ChunkedResponseStart(HttpResponse(status = 200, entity = ""))

      context become {
        case SendMessage(message: String) =>
          conn ! MessageChunk(message + "\r\n")

        case _: Http.ConnectionClosed =>
          logger.info("connection closed")
          publisher ! Publisher.UnSubscribe
      }

    case HttpRequest(POST, Uri.Path("/"), _, entity, _) =>
      publisher ! Publisher.Publish(entity.asString(HttpCharsets.`UTF-8`))
      sender ! HttpResponse()
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.