-
-
Save manasdk/ea816f45b26ff4a74f99e2c99116b0b5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.rholder.retry; | |
import com.github.rholder.retry.Attempt; | |
import com.github.rholder.retry.WaitStrategies; | |
import com.github.rholder.retry.WaitStrategy; | |
import com.github.rholder.retry.StopStrategies; | |
import com.github.rholder.retry.StopStrategy; | |
import com.google.common.base.Function; | |
import com.google.common.base.Preconditions; | |
import com.google.common.base.Predicate; | |
import com.google.common.base.Predicates; | |
import com.google.common.util.concurrent.FutureCallback; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.SettableFuture; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Retries calls that returns a {@link ListenableFuture} by itself returning a {@link ListenableFuture}. | |
*/ | |
public class FutureRetryer<I, O> { | |
private static final Logger logger = LoggerFactory.getLogger(FutureRetryer.class); | |
private final Function<I, ListenableFuture<O>> wrappedCall; | |
private final I input; | |
private final WaitStrategy waitStrategy; | |
private final StopStrategy stopStrategy; | |
private final Predicate<Attempt<Object>> retryableExceptionPredicate; | |
private final ScheduledExecutorService scheduledExecutorService; | |
private final SettableFuture<O> responseFuture; | |
private final AtomicInteger attemptCount = new AtomicInteger(0); | |
private final long startTime; | |
public FutureRetryer(Function<I, ListenableFuture<O>> wrappedCall, I input, WaitStrategy waitStrategy, | |
StopStrategy stopStrategy, Predicate<Attempt<Object>> rejectionPredicate, | |
ScheduledExecutorService scheduledExecutorService) { | |
this.wrappedCall = wrappedCall; | |
this.input = input; | |
this.waitStrategy = waitStrategy; | |
this.stopStrategy = stopStrategy; | |
this.retryableExceptionPredicate = rejectionPredicate; | |
this.scheduledExecutorService = scheduledExecutorService; | |
this.responseFuture = SettableFuture.create(); | |
this.startTime = System.nanoTime(); | |
} | |
public ListenableFuture<O> performAction() { | |
performActionImpl(); | |
return this.responseFuture; | |
} | |
public int getAttemptCount() { | |
return this.attemptCount.get(); | |
} | |
private void performActionImpl() { | |
ListenableFuture<O> callResponseFuture = wrappedCall.apply(this.input); | |
Futures.addCallback(callResponseFuture, new FutureCallback<O>() { | |
@Override | |
public void onSuccess(final O result) { | |
handleSuccessfulResponse(result); | |
} | |
@Override | |
public void onFailure(final Throwable throwable) { | |
handleFailureResponse(throwable); | |
} | |
}); | |
} | |
/** | |
* Handles successful response by writing to the responseFuture. | |
*/ | |
private void handleSuccessfulResponse(O result) { | |
this.responseFuture.set(result); | |
} | |
/** | |
* Handles failure response by retrying if the failure is retryable and the all attempts have not been | |
* exhausted. In case a retry is not possible the last exception is set on the responseFuture. | |
*/ | |
private void handleFailureResponse(Throwable throwable) { | |
int currentAttemptCount = this.attemptCount.get(); | |
ExceptionAttempt attempt = new ExceptionAttempt(throwable, currentAttemptCount, | |
System.nanoTime() - this.startTime); | |
// If the retryable exception predicate does not allow the last attempt then set the exception on the | |
// response future and end all further retries. | |
if (!this.retryableExceptionPredicate.apply((Attempt<Object>) attempt)) { | |
this.responseFuture.setException(throwable); | |
return; | |
} | |
// check if the no of retries is exhausted | |
if (stopStrategy.shouldStop(attempt)) { | |
this.responseFuture.setException(throwable); | |
return; | |
} | |
// increment after it is known that a retry should happen | |
this.attemptCount.incrementAndGet(); | |
// schedule retry based after some delay | |
long delayTime = this.waitStrategy.computeSleepTime(attempt); | |
// schedule for delayed execution. | |
this.scheduledExecutorService.schedule(() -> this.performActionImpl(), delayTime, TimeUnit.MILLISECONDS); | |
} | |
public static class Builder<IB, OB> { | |
private static final int DEFAULT_STOP_ATTEMPT = 10; | |
private static final int DEFAULT_WAIT_MULTIPLIER = 300; | |
private static final int DEFAULT_WAIT_MAX = 60; | |
private static final TimeUnit DEFAULT_WAIT_MAX_UNIT = TimeUnit.SECONDS; | |
private Function<IB, ListenableFuture<OB>> wrappedCall; | |
private IB input; | |
private WaitStrategy waitStrategy; | |
private StopStrategy stopStrategy; | |
private Predicate<Attempt<Object>> rejectionPredicate = Predicates.alwaysFalse(); | |
private ScheduledExecutorService scheduledExecutorService; | |
public Builder<IB, OB> setWrappedCall(Function<IB, ListenableFuture<OB>> wrappedCall) { | |
this.wrappedCall = wrappedCall; | |
return this; | |
} | |
public Builder<IB, OB> setInput(IB input) { | |
this.input = input; | |
return this; | |
} | |
public Builder<IB, OB> setWaitStrategy(WaitStrategy waitStrategy) { | |
this.waitStrategy = waitStrategy; | |
return this; | |
} | |
public Builder<IB, OB> setStopStrategy(StopStrategy stopStrategy) { | |
this.stopStrategy = stopStrategy; | |
return this; | |
} | |
public Builder<IB, OB> retryIfExceptionOfType(Class<? extends Throwable> exceptionClass) { | |
this.rejectionPredicate = (Predicate<Attempt<Object>>) Predicates.or(this.rejectionPredicate, | |
new ExceptionClassPredicate(exceptionClass)); | |
return this; | |
} | |
public Builder<IB, OB> setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { | |
this.scheduledExecutorService = scheduledExecutorService; | |
return this; | |
} | |
public FutureRetryer<IB, OB> build() { | |
Preconditions.checkNotNull(this.wrappedCall, "Wrapped call is required."); | |
Preconditions.checkNotNull(this.scheduledExecutorService, "Scheduled executor service is required."); | |
if (this.stopStrategy == null) { | |
logger.warn("No StopStrategy provided, using default strategy."); | |
this.stopStrategy = StopStrategies.stopAfterAttempt(DEFAULT_STOP_ATTEMPT); | |
} | |
if (this.waitStrategy == null) { | |
logger.warn("No WaitStrategy provided, using default strategy."); | |
this.waitStrategy = WaitStrategies.exponentialWait(DEFAULT_WAIT_MULTIPLIER, DEFAULT_WAIT_MAX, | |
DEFAULT_WAIT_MAX_UNIT); | |
} | |
return new FutureRetryer<>( | |
this.wrappedCall, | |
this.input, | |
this.waitStrategy, | |
this.stopStrategy, | |
this.rejectionPredicate, | |
this.scheduledExecutorService | |
); | |
} | |
} | |
/** | |
* Attempt impl to be used with an Exception. | |
*/ | |
static final class ExceptionAttempt implements Attempt<Object> { | |
private final ExecutionException e; | |
private final long attemptNumber; | |
private final long delaySinceFirstAttempt; | |
ExceptionAttempt(Throwable cause, long attemptNumber, long delaySinceFirstAttempt) { | |
this.e = new ExecutionException(cause); | |
this.attemptNumber = attemptNumber; | |
this.delaySinceFirstAttempt = delaySinceFirstAttempt; | |
} | |
@Override | |
public Object get() throws ExecutionException { | |
throw this.e; | |
} | |
@Override | |
public boolean hasResult() { | |
return false; | |
} | |
@Override | |
public boolean hasException() { | |
return true; | |
} | |
@Override | |
public Object getResult() throws IllegalStateException { | |
throw new IllegalStateException("The attempt resulted in an exception, not in a result"); | |
} | |
@Override | |
public Throwable getExceptionCause() throws IllegalStateException { | |
return this.e.getCause(); | |
} | |
@Override | |
public long getAttemptNumber() { | |
return attemptNumber; | |
} | |
@Override | |
public long getDelaySinceFirstAttempt() { | |
return delaySinceFirstAttempt; | |
} | |
} | |
static final class ExceptionClassPredicate implements Predicate<Attempt<Object>> { | |
private Class<? extends Throwable> exceptionClass; | |
ExceptionClassPredicate(Class<? extends Throwable> exceptionClass) { | |
this.exceptionClass = exceptionClass; | |
} | |
@Override | |
public boolean apply(Attempt<Object> attempt) { | |
if (!attempt.hasException()) { | |
return false; | |
} | |
return exceptionClass.isAssignableFrom(attempt.getExceptionCause().getClass()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment