Skip to content

Instantly share code, notes, and snippets.

@ZacSweers
Forked from sddamico/LICENSE
Created February 23, 2016 01:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ZacSweers/90f49486a73f8cf9a500 to your computer and use it in GitHub Desktop.
Save ZacSweers/90f49486a73f8cf9a500 to your computer and use it in GitHub Desktop.
Exponential Backoff Transformer
/**
* @param interval The base interval to start backing off from. The function is: attemptNum^2 * intervalTime
* @param units The units for interval
* @param retryAttempts The max number of attempts to retry this task or -1 to try MAX_INT times,
*/
public static <T> Observable.Transformer<T, T> backoff(final long interval, final TimeUnit units, final int retryAttempts) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(final Observable<T> observable) {
return observable.retryWhen(
retryFunc(interval, units, retryAttempts),
Schedulers.immediate()
);
}
};
}
private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryFunc(final long interval, final TimeUnit units, final int attempts) {
return new Func1<Observable<? extends Throwable>, Observable<Long>>() {
@Override
public Observable<Long> call(Observable<? extends Throwable> observable) {
// zip our number of retries to the incoming errors so that we only produce retries
// when there's been an error
return observable.zipWith(
Observable.range(1, attempts > 0 ? attempts : Integer.MAX_VALUE),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attemptNumber) {
return attemptNumber;
}
})
// flatMap the int attempt number to a timer that will wait the specified delay
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(final Integer integer) {
long newInterval = interval * ((long) integer * (long) integer);
if (newInterval < 0) {
newInterval = Long.MAX_VALUE;
}
// use Schedulers#immediate() to keep on same thread
return Observable.timer(newInterval, units, Schedulers.immediate());
}
});
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment