Skip to content

Instantly share code, notes, and snippets.

@ponkotuy
Last active December 16, 2015 04:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ponkotuy/5379540 to your computer and use it in GitHub Desktop.
Save ponkotuy/5379540 to your computer and use it in GitHub Desktop.
Akka ActorでPipeline処理
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.gracefulStop
object Main extends App {
implicit val duration = 5.seconds
val system = ActorSystem()
// 初期化時に、データを渡すAcotrRefを引数にして初期化する
val ref = {
val refA2 = system.actorOf(Props(new A(None)), name="a2")
val refB1 = system.actorOf(Props(new B(Some(refA2))), name="b1")
system.actorOf(Props(new A(Some(refB1))), name="a1")
}
// 実際に10個のデータを投げている
(1 to 10).foreach(ref ! _.toString)
// 終了処理
Await.result(gracefulStop(ref, duration)(system), duration)
system.shutdown()
}
// Actorその1
class A(next: Option[ActorRef])(implicit duration: FiniteDuration)
extends PipeActor(next) {
override def receive = {
case x: String =>
Thread.sleep(100)
println("Receive: %s (%s)".format(x, self.path))
super.receive(x)
}
}
// Actorその2
class B(next: Option[ActorRef])(implicit duration: FiniteDuration)
extends PipeActor(next) {
override def receive = {
case x: String =>
Thread.sleep(100)
println("Receive: %s (%s)".format(x, self.path))
super.receive(x)
}
}
// Pipelineに使うActorの基底クラス。現状だと一直線にしかできない
abstract class PipeActor(next: Option[ActorRef])(implicit duration: FiniteDuration)
extends Actor {
def receive = {
// nextのActorRefに処理を投げる
case x =>
println("Pipe")
next.foreach { _ ! x }
}
// stopもnextに投げる
override def postStop() {
println("Shutdown")
next.foreach { it =>
Await.result(gracefulStop(it, duration)(context.system), duration)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment