AkkaRetryActor
import akka.actor.{Actor, ActorRef, ActorSystem, CoordinatedShutdown, Props} | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.math.BigInt | |
import scala.util.Success | |
import akka.pattern._ | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
class RetryMsg[B](val count: BigInt, val f: () => Future[B]) { | |
override def toString: String = { | |
s"count: ${count}" | |
} | |
} | |
object RetryMsg { | |
def apply[B](count: BigInt, f: () => Future[B]): RetryMsg[B] = new RetryMsg(count, f) | |
def unapply[B](arg: RetryMsg[B]): Option[(BigInt, () => Future[B])] = { | |
Some(arg.count, arg.f) | |
} | |
} | |
object ActorMessagingUtilImplicit { | |
implicit class LogActorMessages(a: ActorRef) { | |
def !!(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { | |
println(s"Processing: $message") | |
a ! message | |
println(s"Processed: $message") | |
} | |
} | |
} | |
object ActorSystemUtilImplicit { | |
implicit class ActorSystemTerminate(s: ActorSystem) { | |
def registerSystemShutdown() = { | |
CoordinatedShutdown(s).addJvmShutdownHook { | |
println("Terminating System") | |
s.terminate() | |
println("Terminated System") | |
} | |
} | |
} | |
} | |
import ActorMessagingUtilImplicit._ | |
import ActorSystemUtilImplicit._ | |
class RetryActor extends Actor { | |
def receive: Receive = { | |
case RetryMsg(c, f) => | |
c match { | |
case t if t == 0 => sender() !! f() | |
case t if t > 0 => f().onComplete { | |
case Success(value) => sender() !! value | |
case _ => self !! RetryMsg(t-1, f) | |
} | |
} | |
} | |
} | |
object RetryActor { | |
def props(): Props = Props(new RetryActor()) | |
} | |
object AkkaRetryActor { | |
def main(args: Array[String]): Unit = { | |
implicit val system = ActorSystem("AkkaRetryActorSystem") | |
system.registerSystemShutdown | |
val retryActor = system.actorOf(RetryActor.props(), "retryActor") | |
implicit val timeOut = Timeout(5 seconds) | |
(retryActor ? RetryMsg(3, () => Future(throw new Exception("hello")))).onComplete { | |
case _ => | |
System.exit(0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment