Created
November 6, 2018 16:58
-
-
Save shankarshastri/1254d9105d167163873f297c88de1698 to your computer and use it in GitHub Desktop.
AkkaRetryActor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorRef, ActorSystem, CoordinatedShutdown, Props} | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.math.BigInt | |
import scala.util.Success | |
import akka.pattern._ | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
class RetryMsg[B](val count: BigInt, val f: () => Future[B]) { | |
override def toString: String = { | |
s"count: ${count}" | |
} | |
} | |
object RetryMsg { | |
def apply[B](count: BigInt, f: () => Future[B]): RetryMsg[B] = new RetryMsg(count, f) | |
def unapply[B](arg: RetryMsg[B]): Option[(BigInt, () => Future[B])] = { | |
Some(arg.count, arg.f) | |
} | |
} | |
object ActorMessagingUtilImplicit { | |
implicit class LogActorMessages(a: ActorRef) { | |
def !!(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { | |
println(s"Processing: $message") | |
a ! message | |
println(s"Processed: $message") | |
} | |
} | |
} | |
object ActorSystemUtilImplicit { | |
implicit class ActorSystemTerminate(s: ActorSystem) { | |
def registerSystemShutdown() = { | |
CoordinatedShutdown(s).addJvmShutdownHook { | |
println("Terminating System") | |
s.terminate() | |
println("Terminated System") | |
} | |
} | |
} | |
} | |
import ActorMessagingUtilImplicit._ | |
import ActorSystemUtilImplicit._ | |
class RetryActor extends Actor { | |
def receive: Receive = { | |
case RetryMsg(c, f) => | |
c match { | |
case t if t == 0 => sender() !! f() | |
case t if t > 0 => f().onComplete { | |
case Success(value) => sender() !! value | |
case _ => self !! RetryMsg(t-1, f) | |
} | |
} | |
} | |
} | |
object RetryActor { | |
def props(): Props = Props(new RetryActor()) | |
} | |
object AkkaRetryActor { | |
def main(args: Array[String]): Unit = { | |
implicit val system = ActorSystem("AkkaRetryActorSystem") | |
system.registerSystemShutdown | |
val retryActor = system.actorOf(RetryActor.props(), "retryActor") | |
implicit val timeOut = Timeout(5 seconds) | |
(retryActor ? RetryMsg(3, () => Future(throw new Exception("hello")))).onComplete { | |
case _ => | |
System.exit(0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment