Skip to content

Instantly share code, notes, and snippets.

@shankarshastri
Created November 6, 2018 16:58
Show Gist options
  • Save shankarshastri/1254d9105d167163873f297c88de1698 to your computer and use it in GitHub Desktop.
Save shankarshastri/1254d9105d167163873f297c88de1698 to your computer and use it in GitHub Desktop.
AkkaRetryActor
import akka.actor.{Actor, ActorRef, ActorSystem, CoordinatedShutdown, Props}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.math.BigInt
import scala.util.Success
import akka.pattern._
import akka.util.Timeout
import scala.concurrent.duration._
class RetryMsg[B](val count: BigInt, val f: () => Future[B]) {
override def toString: String = {
s"count: ${count}"
}
}
object RetryMsg {
def apply[B](count: BigInt, f: () => Future[B]): RetryMsg[B] = new RetryMsg(count, f)
def unapply[B](arg: RetryMsg[B]): Option[(BigInt, () => Future[B])] = {
Some(arg.count, arg.f)
}
}
object ActorMessagingUtilImplicit {
implicit class LogActorMessages(a: ActorRef) {
def !!(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
println(s"Processing: $message")
a ! message
println(s"Processed: $message")
}
}
}
object ActorSystemUtilImplicit {
implicit class ActorSystemTerminate(s: ActorSystem) {
def registerSystemShutdown() = {
CoordinatedShutdown(s).addJvmShutdownHook {
println("Terminating System")
s.terminate()
println("Terminated System")
}
}
}
}
import ActorMessagingUtilImplicit._
import ActorSystemUtilImplicit._
class RetryActor extends Actor {
def receive: Receive = {
case RetryMsg(c, f) =>
c match {
case t if t == 0 => sender() !! f()
case t if t > 0 => f().onComplete {
case Success(value) => sender() !! value
case _ => self !! RetryMsg(t-1, f)
}
}
}
}
object RetryActor {
def props(): Props = Props(new RetryActor())
}
object AkkaRetryActor {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("AkkaRetryActorSystem")
system.registerSystemShutdown
val retryActor = system.actorOf(RetryActor.props(), "retryActor")
implicit val timeOut = Timeout(5 seconds)
(retryActor ? RetryMsg(3, () => Future(throw new Exception("hello")))).onComplete {
case _ =>
System.exit(0)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment