Skip to content

Instantly share code, notes, and snippets.

@masahitojp
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masahitojp/422be22884c2e575b338 to your computer and use it in GitHub Desktop.
Save masahitojp/422be22884c2e575b338 to your computer and use it in GitHub Desktop.
import akka.actor._
import akka.actor.SupervisorStrategy._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.{ ask, pipe }
import com.typesafe.config.ConfigFactory
/**_
* FirstErrorActor用のエラーフラグ
*/
object Flag {
var isError = true
}
object WorkerProtocol {
case object Start
case object Do
case object Success
case object End
}
class Listener extends Actor with ActorLogging {
import WorkerProtocol._
context.setReceiveTimeout(15 seconds)
def receive = {
case End =>
log.info("Shutting down due to receive End message")
context.system.shutdown()
case ReceiveTimeout =>
// No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service")
context.system.shutdown()
}
}
// states
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
case object Initialized extends Data
class Worker extends Actor with ActorLogging {
import context.dispatcher
import WorkerProtocol._
override val supervisorStrategy = OneForOneStrategy() {
case _: FirstErrorException => Restart
}
var progressListener: Option[ActorRef] = None
val errorOnlyFirstReceived = context.actorOf(Props[ErrorOnlyFirstReceiveActor])
val healthy = context.actorOf(Props[HealthyActor])
override def receive: Receive = {
case Start if progressListener.isEmpty =>
log.info("-----start()-----")
progressListener = Option(sender)
self ! Do
case Do => {
log.info("-----HealthyReceiveActor receive-----")
healthy ! "success!"
TimeUnit.SECONDS.sleep(1)
log.info("-----ErrorOnlyFirstReceiveActor receive(Error)-----")
errorOnlyFirstReceived ! "error!"
TimeUnit.SECONDS.sleep(1)
log.info("-----ErrorOnlyFirstReceiveActor receive(Success)-----")
implicit val askTimeout = Timeout(5 seconds)
errorOnlyFirstReceived ? "success!" map {
case End => End
} pipeTo progressListener.get
}
}
}
class FirstErrorException extends Exception
/**
* 1回目はエラーになるアクター
*/
class ErrorOnlyFirstReceiveActor extends Actor with ActorLogging {
def receive = {
case msg :String => log.info(msg + " by ErrorOnlyFirstReceiveActor")
if (Flag.isError) new FirstErrorException()
log.info("ErrorOnlyFirstReceiveActor receive done.")
sender ! WorkerProtocol.End
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info("ErrorOnlyFirstReceiveActor pre Restart.")
Flag.isError = false
}
override def postRestart(reason: Throwable) = {
log.info("ErrorOnlyFirstReceiveActor post Restart.")
}
}
/**
* 通常のアクター
*/
class HealthyActor extends Actor with ActorLogging {
def receive = {
case msg =>
log.info(msg + " by HealthyReceiveActor")
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
log.info("HealthyActor pre Restart.")
}
override def postRestart(reason: Throwable) = {
log.info("HealthyActor post Restart.")
}
}
object Main extends App {
import WorkerProtocol._
val config = ConfigFactory.parseString("""
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
""")
val as = ActorSystem("test", config)
val worker = as.actorOf(Props(new Worker), name = "worker")
val listener = as.actorOf(Props(new Listener), name = "listener")
worker.tell(Start, sender = listener)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment