Skip to content

Instantly share code, notes, and snippets.

@DanielGrech
Created February 4, 2015 22:40
Show Gist options
  • Save DanielGrech/142f1d4c12824f8fa673 to your computer and use it in GitHub Desktop.
Save DanielGrech/142f1d4c12824f8fa673 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.functions.Func0;
/**
* Wraps a source observable for creating a new observable
* on every subscribe.
* <p/>
* For use with {@link Observable#defer(rx.functions.Func0)}
*/
public class DeferredObservable<T> implements Func0<Observable<T>> {
private final Observable<T> source;
public DeferredObservable(Observable<T> source) {
this.source = source;
}
@Override
public Observable<T> call() {
return source;
}
}
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func1;
import timber.log.Timber;
public class DelayFunction implements Func1<Integer, Observable<?>> {
@Override
public Observable<?> call(Integer delayInSeconds) {
Timber.v("Delaying observable by %s seconds", delayInSeconds);
return Observable.timer(delayInSeconds, TimeUnit.SECONDS);
}
}
sourceObservableToRetry
.compose(new ExponentialBackoffTransformer(3, new SecondsMultiplierBackoffStrategy(1))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(new Action1<>(){})
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
public class ExponentialBackoffTransformer<T> implements Observable.Transformer<T, T> {
private final int attempts;
private final Func1<Integer, Integer> attemptToDelayMap;
public ExponentialBackoffTransformer(int attempts, Func1<Integer, Integer> attemptToDelayMap) {
this.attempts = attempts;
this.attemptToDelayMap = attemptToDelayMap;
}
public ExponentialBackoffTransformer(int attempts) {
this(attempts, ONE_SECOND_DELAY_PER_ATTEMPT_STRATEGY);
}
@Override
public Observable<T> call(final Observable<T> source) {
if (attempts < 1) {
return source;
}
return Observable.defer(new DeferredObservable<>(source))
.retryWhen(getRetryHandler());
}
private Func1<Observable<? extends Throwable>, Observable<?>> getRetryHandler() {
final Observable<Integer> attemptRange = Observable.range(1, attempts);
final Func2<Throwable, Integer, Integer> zipFunction
= new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attemptCount) {
return attemptCount;
}
};
return new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(attemptRange, zipFunction)
.map(attemptToDelayMap)
.flatMap(new DelayFunction());
}
};
}
public static final Func1<Integer, Integer> ONE_SECOND_DELAY_PER_ATTEMPT_STRATEGY
= new SecondsMultiplierBackoffStrategy(1);
/**
* Backoff for a certain number of seconds multiplied by each attempt.
*
* Eg. If the multiplier is set to '3':
*
* <table>
* <tr>
* <td align="center">Attempt</td>
* <td align="center">Delay</td>
* </tr>
* <tr>
* <td align="center">1</td>
* <td align="center">3 seconds</td>
* </tr>
* <tr>
* <td align="center">2</td>
* <td align="center">6 seconds</td>
* </tr>
* <tr>
* <td align="center">3</td>
* <td align="center">9 seconds</td>
* </tr>
* <tr>
* <td align="center">4</td>
* <td align="center">12 seconds</td>
* </tr>
* </table>
*
*
*/
public static class SecondsMultiplierBackoffStrategy implements Func1<Integer, Integer> {
private final int multiplierPerAttempt;
public SecondsMultiplierBackoffStrategy(int multiplierPerAttempt) {
this.multiplierPerAttempt = multiplierPerAttempt;
}
@Override
public Integer call(Integer attemptNumber) {
return attemptNumber * multiplierPerAttempt;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment