Skip to content

Instantly share code, notes, and snippets.

@codetinkerhack
Last active June 18, 2020 09:39
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save codetinkerhack/8206481 to your computer and use it in GitHub Desktop.
Save codetinkerhack/8206481 to your computer and use it in GitHub Desktop.
Retry Akka actor /Ask pattern with individual timeout, retry intervals
package com.codetinkerhack
import akka.actor.{ ActorRef, Props, Actor, ActorLogging }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import akka.actor.Actor.Receive
import akka.pattern.pipe
import scala.util.Success
import scala.util.Failure
object ReTry {
private case class Retry(originalSender: ActorRef, message: Any, times: Int)
private case class Response(originalSender: ActorRef, result: Any)
def props(tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef): Props = Props(new ReTry(tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef))
}
class ReTry(val tries: Int, retryTimeOut: FiniteDuration, retryInterval: FiniteDuration, forwardTo: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
import ReTry._
// Retry loop that keep on Re-trying the request
def retryLoop: Receive = {
// Response from future either Success or Failure is a Success - we propagate it back to a original sender
case Response(originalSender, result) =>
originalSender ! result
context stop self
case Retry(originalSender, message, triesLeft) =>
// Process (Re)try here. When future completes it sends result to self
(forwardTo ? message) (retryTimeOut) onComplete {
case Success(result) =>
self ! Response(originalSender, result) // sending responses via self synchronises results from futures that may come potentially in any order. It also helps the case when the actor is stopped (in this case responses will become deadletters)
case Failure(ex) =>
if (triesLeft - 1 == 0) {// In case of last try and we got a failure (timeout) lets send Retries exceeded error
self ! Response(originalSender, Failure(new Exception("Retries exceeded")))
}
else
log.error("Error occurred: " + ex)
}
// Send one more retry after interval
if (triesLeft - 1 > 0)
context.system.scheduler.scheduleOnce(retryInterval, self, Retry(originalSender, message, triesLeft - 1))
case m @ _ =>
log.error("No handling defined for message: " + m)
}
// Initial receive loop
def receive: Receive = {
case message @ _ =>
self ! Retry(sender, message, tries)
// Lets swap to a retry loop here.
context.become(retryLoop, false)
}
}
@jcerdeira
Copy link

Good and simple implementation, I'm going to steal your implementation.

Thanks

@vkrot
Copy link

vkrot commented Nov 3, 2015

line 47, log.error("Error occurred: " + ex) - is it thread safe to access _log field of actor instance in Future callback?

@mithundebnath
Copy link

What is the equivalent java code for this.

@isokissa
Copy link

Line 51-52 should be in Failure case, else branch, and not directly after handling of ask pattern, because we want to retry only after we are sure that forwardTo ? message has failed before retrying.

@otto-dev
Copy link

Ima steal and modify to save some time. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment