Created
November 6, 2014 15:40
-
-
Save antigremlin/7680b4294c9b9e128b60 to your computer and use it in GitHub Desktop.
Ask with retry in akka, returning Promise<>
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spike.app2; | |
import akka.actor.ActorContext; | |
import akka.actor.ActorRef; | |
import akka.actor.Props; | |
import akka.actor.ReceiveTimeout; | |
import akka.actor.UntypedActor; | |
import akka.dispatch.Futures; | |
import akka.pattern.AskTimeoutException; | |
import scala.concurrent.Future; | |
import scala.concurrent.Promise; | |
import scala.concurrent.duration.Duration; | |
public class Patterns { | |
public static Future<Object> askRetry(ActorContext context, ActorRef target, | |
Object msg, Duration duration, int retires) { | |
Promise<Object> promise = Futures.promise(); | |
context.actorOf(Props.create(RetryActor.class, target, msg, duration, retires, promise)); | |
return promise.future(); | |
} | |
static class RetryActor extends UntypedActor { | |
private final Promise<Object> promise; | |
private final ActorRef target; | |
private final Object msg; | |
private final Duration duration; | |
private int retries; | |
RetryActor(ActorRef target, Object msg, Duration duration, int retries, | |
Promise<Object> promise) { | |
this.promise = promise; | |
this.target = target; | |
this.msg = msg; | |
this.duration = duration; | |
this.retries = retries; | |
} | |
@Override public void preStart() { | |
getContext().setReceiveTimeout(duration); | |
target.tell(msg, getSelf()); | |
} | |
@Override public void onReceive(Object rmsg) { | |
if (rmsg instanceof ReceiveTimeout) { | |
if (retries > 0) { | |
target.tell(msg, getSelf()); | |
retries--; | |
} else { | |
promise.failure(new AskTimeoutException("")); | |
getContext().stop(getSelf()); | |
} | |
} else { | |
promise.success(rmsg); | |
getContext().stop(getSelf()); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment