Skip to content

Instantly share code, notes, and snippets.

@davinctor
Last active February 3, 2020 23:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davinctor/a302fab19a65550aae60249e1a74c1c7 to your computer and use it in GitHub Desktop.
Save davinctor/a302fab19a65550aae60249e1a74c1c7 to your computer and use it in GitHub Desktop.
Observable.Transformer to add retry feature to source observable if it fails. Every retry can be delayed.
import android.support.annotation.IntRange;
import android.support.annotation.NonNull;
import android.text.TextUtils;
import com.petcube.android.helpers.Log;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;
/**
* {@link Observable.Transformer} to add retry feature to source observable if it fails.
* Every retry can be delayed.
*/
public class RetryWithDelayTransformer<T> implements Observable.Transformer<T, T> {
/**
* Rx flow required additional retry number. This used special for {@link Observable#range(int, int)}
* operator, because it call {@link Subscriber#onCompleted()} when range came to end, that's why
* item emitted by source observable doesn't forwarded.
*/
private static final int ADDITIONAL_RETRY_COUNT = 1;
private final int mRetryCount;
private final String mOperationTag;
private final long mRetryDelayMs;
public RetryWithDelayTransformer(@NonNull String operationTag,
@IntRange(from = 1) int retryCount,
@IntRange(from = 1) long retryDelayMs) {
if (TextUtils.isEmpty(operationTag)) {
throw new IllegalArgumentException("Operation tag have to be not null");
}
if (retryCount < 1) {
throw new IllegalArgumentException("Retry count can't be less than 1");
}
if (retryDelayMs < 1) {
throw new IllegalArgumentException("Retry delay ms can't be less than 1");
}
mOperationTag = operationTag;
mRetryCount = retryCount + ADDITIONAL_RETRY_COUNT;
mRetryDelayMs = retryDelayMs;
}
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.zipWith(Observable.range(1, mRetryCount),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attemptNumber) {
if (attemptNumber > mRetryCount - ADDITIONAL_RETRY_COUNT) {
throw new RuntimeException(throwable);
}
return attemptNumber;
}
})
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer attemptNumber) {
long delay = attemptNumber * mRetryDelayMs;
Log.i(mOperationTag, "Delay retry by " + delay + " ms");
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends Long>>() {
@Override
public Observable<? extends Long> call(Throwable throwable) {
return Observable.error(throwable);
}
});
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment