Skip to content

Instantly share code, notes, and snippets.

@manasdk
Created October 3, 2016 23:56
Show Gist options
  • Save manasdk/ea816f45b26ff4a74f99e2c99116b0b5 to your computer and use it in GitHub Desktop.
Save manasdk/ea816f45b26ff4a74f99e2c99116b0b5 to your computer and use it in GitHub Desktop.
package com.github.rholder.retry;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.StopStrategy;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Retries calls that returns a {@link ListenableFuture} by itself returning a {@link ListenableFuture}.
*/
public class FutureRetryer<I, O> {
private static final Logger logger = LoggerFactory.getLogger(FutureRetryer.class);
private final Function<I, ListenableFuture<O>> wrappedCall;
private final I input;
private final WaitStrategy waitStrategy;
private final StopStrategy stopStrategy;
private final Predicate<Attempt<Object>> retryableExceptionPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final SettableFuture<O> responseFuture;
private final AtomicInteger attemptCount = new AtomicInteger(0);
private final long startTime;
public FutureRetryer(Function<I, ListenableFuture<O>> wrappedCall, I input, WaitStrategy waitStrategy,
StopStrategy stopStrategy, Predicate<Attempt<Object>> rejectionPredicate,
ScheduledExecutorService scheduledExecutorService) {
this.wrappedCall = wrappedCall;
this.input = input;
this.waitStrategy = waitStrategy;
this.stopStrategy = stopStrategy;
this.retryableExceptionPredicate = rejectionPredicate;
this.scheduledExecutorService = scheduledExecutorService;
this.responseFuture = SettableFuture.create();
this.startTime = System.nanoTime();
}
public ListenableFuture<O> performAction() {
performActionImpl();
return this.responseFuture;
}
public int getAttemptCount() {
return this.attemptCount.get();
}
private void performActionImpl() {
ListenableFuture<O> callResponseFuture = wrappedCall.apply(this.input);
Futures.addCallback(callResponseFuture, new FutureCallback<O>() {
@Override
public void onSuccess(final O result) {
handleSuccessfulResponse(result);
}
@Override
public void onFailure(final Throwable throwable) {
handleFailureResponse(throwable);
}
});
}
/**
* Handles successful response by writing to the responseFuture.
*/
private void handleSuccessfulResponse(O result) {
this.responseFuture.set(result);
}
/**
* Handles failure response by retrying if the failure is retryable and the all attempts have not been
* exhausted. In case a retry is not possible the last exception is set on the responseFuture.
*/
private void handleFailureResponse(Throwable throwable) {
int currentAttemptCount = this.attemptCount.get();
ExceptionAttempt attempt = new ExceptionAttempt(throwable, currentAttemptCount,
System.nanoTime() - this.startTime);
// If the retryable exception predicate does not allow the last attempt then set the exception on the
// response future and end all further retries.
if (!this.retryableExceptionPredicate.apply((Attempt<Object>) attempt)) {
this.responseFuture.setException(throwable);
return;
}
// check if the no of retries is exhausted
if (stopStrategy.shouldStop(attempt)) {
this.responseFuture.setException(throwable);
return;
}
// increment after it is known that a retry should happen
this.attemptCount.incrementAndGet();
// schedule retry based after some delay
long delayTime = this.waitStrategy.computeSleepTime(attempt);
// schedule for delayed execution.
this.scheduledExecutorService.schedule(() -> this.performActionImpl(), delayTime, TimeUnit.MILLISECONDS);
}
public static class Builder<IB, OB> {
private static final int DEFAULT_STOP_ATTEMPT = 10;
private static final int DEFAULT_WAIT_MULTIPLIER = 300;
private static final int DEFAULT_WAIT_MAX = 60;
private static final TimeUnit DEFAULT_WAIT_MAX_UNIT = TimeUnit.SECONDS;
private Function<IB, ListenableFuture<OB>> wrappedCall;
private IB input;
private WaitStrategy waitStrategy;
private StopStrategy stopStrategy;
private Predicate<Attempt<Object>> rejectionPredicate = Predicates.alwaysFalse();
private ScheduledExecutorService scheduledExecutorService;
public Builder<IB, OB> setWrappedCall(Function<IB, ListenableFuture<OB>> wrappedCall) {
this.wrappedCall = wrappedCall;
return this;
}
public Builder<IB, OB> setInput(IB input) {
this.input = input;
return this;
}
public Builder<IB, OB> setWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
public Builder<IB, OB> setStopStrategy(StopStrategy stopStrategy) {
this.stopStrategy = stopStrategy;
return this;
}
public Builder<IB, OB> retryIfExceptionOfType(Class<? extends Throwable> exceptionClass) {
this.rejectionPredicate = (Predicate<Attempt<Object>>) Predicates.or(this.rejectionPredicate,
new ExceptionClassPredicate(exceptionClass));
return this;
}
public Builder<IB, OB> setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
return this;
}
public FutureRetryer<IB, OB> build() {
Preconditions.checkNotNull(this.wrappedCall, "Wrapped call is required.");
Preconditions.checkNotNull(this.scheduledExecutorService, "Scheduled executor service is required.");
if (this.stopStrategy == null) {
logger.warn("No StopStrategy provided, using default strategy.");
this.stopStrategy = StopStrategies.stopAfterAttempt(DEFAULT_STOP_ATTEMPT);
}
if (this.waitStrategy == null) {
logger.warn("No WaitStrategy provided, using default strategy.");
this.waitStrategy = WaitStrategies.exponentialWait(DEFAULT_WAIT_MULTIPLIER, DEFAULT_WAIT_MAX,
DEFAULT_WAIT_MAX_UNIT);
}
return new FutureRetryer<>(
this.wrappedCall,
this.input,
this.waitStrategy,
this.stopStrategy,
this.rejectionPredicate,
this.scheduledExecutorService
);
}
}
/**
* Attempt impl to be used with an Exception.
*/
static final class ExceptionAttempt implements Attempt<Object> {
private final ExecutionException e;
private final long attemptNumber;
private final long delaySinceFirstAttempt;
ExceptionAttempt(Throwable cause, long attemptNumber, long delaySinceFirstAttempt) {
this.e = new ExecutionException(cause);
this.attemptNumber = attemptNumber;
this.delaySinceFirstAttempt = delaySinceFirstAttempt;
}
@Override
public Object get() throws ExecutionException {
throw this.e;
}
@Override
public boolean hasResult() {
return false;
}
@Override
public boolean hasException() {
return true;
}
@Override
public Object getResult() throws IllegalStateException {
throw new IllegalStateException("The attempt resulted in an exception, not in a result");
}
@Override
public Throwable getExceptionCause() throws IllegalStateException {
return this.e.getCause();
}
@Override
public long getAttemptNumber() {
return attemptNumber;
}
@Override
public long getDelaySinceFirstAttempt() {
return delaySinceFirstAttempt;
}
}
static final class ExceptionClassPredicate implements Predicate<Attempt<Object>> {
private Class<? extends Throwable> exceptionClass;
ExceptionClassPredicate(Class<? extends Throwable> exceptionClass) {
this.exceptionClass = exceptionClass;
}
@Override
public boolean apply(Attempt<Object> attempt) {
if (!attempt.hasException()) {
return false;
}
return exceptionClass.isAssignableFrom(attempt.getExceptionCause().getClass());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment