Skip to content

Instantly share code, notes, and snippets.

@antigremlin
Created November 6, 2014 15:40
Show Gist options
  • Save antigremlin/7680b4294c9b9e128b60 to your computer and use it in GitHub Desktop.
Save antigremlin/7680b4294c9b9e128b60 to your computer and use it in GitHub Desktop.
Ask with retry in akka, returning Promise<>
package spike.app2;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import akka.dispatch.Futures;
import akka.pattern.AskTimeoutException;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
public class Patterns {
public static Future<Object> askRetry(ActorContext context, ActorRef target,
Object msg, Duration duration, int retires) {
Promise<Object> promise = Futures.promise();
context.actorOf(Props.create(RetryActor.class, target, msg, duration, retires, promise));
return promise.future();
}
static class RetryActor extends UntypedActor {
private final Promise<Object> promise;
private final ActorRef target;
private final Object msg;
private final Duration duration;
private int retries;
RetryActor(ActorRef target, Object msg, Duration duration, int retries,
Promise<Object> promise) {
this.promise = promise;
this.target = target;
this.msg = msg;
this.duration = duration;
this.retries = retries;
}
@Override public void preStart() {
getContext().setReceiveTimeout(duration);
target.tell(msg, getSelf());
}
@Override public void onReceive(Object rmsg) {
if (rmsg instanceof ReceiveTimeout) {
if (retries > 0) {
target.tell(msg, getSelf());
retries--;
} else {
promise.failure(new AskTimeoutException(""));
getContext().stop(getSelf());
}
} else {
promise.success(rmsg);
getContext().stop(getSelf());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment