Last active
August 29, 2015 14:06
-
-
Save masahitojp/422be22884c2e575b338 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.actor.SupervisorStrategy._ | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.duration._ | |
import akka.util.Timeout | |
import akka.pattern.{ ask, pipe } | |
import com.typesafe.config.ConfigFactory | |
/**_ | |
* FirstErrorActor用のエラーフラグ | |
*/ | |
object Flag { | |
var isError = true | |
} | |
object WorkerProtocol { | |
case object Start | |
case object Do | |
case object Success | |
case object End | |
} | |
class Listener extends Actor with ActorLogging { | |
import WorkerProtocol._ | |
context.setReceiveTimeout(15 seconds) | |
def receive = { | |
case End => | |
log.info("Shutting down due to receive End message") | |
context.system.shutdown() | |
case ReceiveTimeout => | |
// No progress within 15 seconds, ServiceUnavailable | |
log.error("Shutting down due to unavailable service") | |
context.system.shutdown() | |
} | |
} | |
// states | |
sealed trait State | |
case object Idle extends State | |
case object Active extends State | |
sealed trait Data | |
case object Uninitialized extends Data | |
case object Initialized extends Data | |
class Worker extends Actor with ActorLogging { | |
import context.dispatcher | |
import WorkerProtocol._ | |
override val supervisorStrategy = OneForOneStrategy() { | |
case _: FirstErrorException => Restart | |
} | |
var progressListener: Option[ActorRef] = None | |
val errorOnlyFirstReceived = context.actorOf(Props[ErrorOnlyFirstReceiveActor]) | |
val healthy = context.actorOf(Props[HealthyActor]) | |
override def receive: Receive = { | |
case Start if progressListener.isEmpty => | |
log.info("-----start()-----") | |
progressListener = Option(sender) | |
self ! Do | |
case Do => { | |
log.info("-----HealthyReceiveActor receive-----") | |
healthy ! "success!" | |
TimeUnit.SECONDS.sleep(1) | |
log.info("-----ErrorOnlyFirstReceiveActor receive(Error)-----") | |
errorOnlyFirstReceived ! "error!" | |
TimeUnit.SECONDS.sleep(1) | |
log.info("-----ErrorOnlyFirstReceiveActor receive(Success)-----") | |
implicit val askTimeout = Timeout(5 seconds) | |
errorOnlyFirstReceived ? "success!" map { | |
case End => End | |
} pipeTo progressListener.get | |
} | |
} | |
} | |
class FirstErrorException extends Exception | |
/** | |
* 1回目はエラーになるアクター | |
*/ | |
class ErrorOnlyFirstReceiveActor extends Actor with ActorLogging { | |
def receive = { | |
case msg :String => log.info(msg + " by ErrorOnlyFirstReceiveActor") | |
if (Flag.isError) new FirstErrorException() | |
log.info("ErrorOnlyFirstReceiveActor receive done.") | |
sender ! WorkerProtocol.End | |
} | |
override def preRestart(reason: Throwable, message: Option[Any]) = { | |
log.info("ErrorOnlyFirstReceiveActor pre Restart.") | |
Flag.isError = false | |
} | |
override def postRestart(reason: Throwable) = { | |
log.info("ErrorOnlyFirstReceiveActor post Restart.") | |
} | |
} | |
/** | |
* 通常のアクター | |
*/ | |
class HealthyActor extends Actor with ActorLogging { | |
def receive = { | |
case msg => | |
log.info(msg + " by HealthyReceiveActor") | |
} | |
override def preRestart(reason: Throwable, message: Option[Any]) = { | |
log.info("HealthyActor pre Restart.") | |
} | |
override def postRestart(reason: Throwable) = { | |
log.info("HealthyActor post Restart.") | |
} | |
} | |
object Main extends App { | |
import WorkerProtocol._ | |
val config = ConfigFactory.parseString(""" | |
akka { | |
loggers = ["akka.event.slf4j.Slf4jLogger"] | |
loglevel = "DEBUG" | |
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" | |
} | |
""") | |
val as = ActorSystem("test", config) | |
val worker = as.actorOf(Props(new Worker), name = "worker") | |
val listener = as.actorOf(Props(new Listener), name = "listener") | |
worker.tell(Start, sender = listener) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment