Last active
January 13, 2021 09:25
-
-
Save jemshit/5b913e98c876863205b2de57b581b6f5 to your computer and use it in GitHub Desktop.
RxJava Retry Web Service Connection incrementally if there is Network Kind of Error.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { | |
private int numberOfTry; | |
private int delay; | |
private int retryCount = 1; | |
private final TimeUnit timeUnit; | |
public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) { | |
this.numberOfTry = numberOfTry; | |
this.delay = delay; | |
this.timeUnit = timeUnit; | |
} | |
@Override | |
public Observable<?> call(Observable<? extends Throwable> errors) { | |
return errors | |
.flatMap(error -> { | |
if(error instanceof IOException) { // Network Error | |
int delayFinal = delay * retryCount; | |
retryCount++; | |
if (retryCount <= numberOfTry+1) { | |
// You might want to notify UI with EventBus etc... with Retry duration | |
Log.d("RetryWithDelay"+" call()", "retryCount " + delayFinal); | |
return Observable.timer((long) delayFinal, timeUnit); | |
} else { | |
return Observable.error(error); // Let them go to onError() after few retries | |
} | |
}else{ // Http Error or other Unexpected Error | |
return Observable.error(error); // Let them go to onError() | |
} | |
}); | |
} | |
} |
Usage:
.retryWhen(new RetryWithDelay(2, 5, TimeUnit.SECONDS))
.subscribe()
RxJava2:
public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {
private final int numberOfTry;
private final int delay;
private final TimeUnit timeUnit;
private final Scheduler timerScheduler; // To provide different Scheduler for testing purpose
public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) {
this.numberOfTry = numberOfTry;
this.timeUnit = timeUnit;
this.delay = delay;
this.timerScheduler = Schedulers.computation();
}
public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit, Scheduler timerScheduler) {
this.numberOfTry = numberOfTry;
this.timeUnit = timeUnit;
this.delay = delay;
this.timerScheduler = timerScheduler;
}
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.zipWith(Observable.range(1, numberOfTry + 1), (throwable, counter) -> { // +1 for error propagation after last retry
if (throwable instanceof IOException) { // If Network Error
if (counter < numberOfTry + 1) {
System.out.println("Retrying after " + counter * delay + " time unit");
return Observable.timer((long) counter * delay, timeUnit, timerScheduler);
} else
return Observable.error(throwable);
} else
return Observable.error(throwable);
})
.flatMap(x -> x);
}
}
Test
public class RetryWithDelayTest {
private boolean returnCorrectAnswer;
@Before
public void setUp() throws Exception {
returnCorrectAnswer = false;
}
@Before
public void tearDown() throws Exception {
returnCorrectAnswer = false;
}
@Test
public void retry_shouldRetryOneTimeAndFail() {
// Assign
TestObserver<String> testObserver = new TestObserver<>();
TestScheduler testScheduler = new TestScheduler();
// Act
createObservable()
.subscribeOn(testScheduler)
.retryWhen(new RetryWithDelay(1, 1, TimeUnit.SECONDS, testScheduler))
.subscribe(testObserver);
// Assert
testObserver.assertSubscribed();
testObserver.assertNoValues();
testObserver.assertNoErrors();
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValueCount(0);
Assert.assertEquals(testObserver.errorCount(), 1);
testObserver.assertError(IOException.class);
testObserver.assertNotComplete();
}
@Test
public void retry_shouldRetryTwoTimesAndFail() {
// Assign
TestObserver<String> testObserver = new TestObserver<>();
TestScheduler testScheduler = new TestScheduler();
// Act
createObservable()
.retryWhen(new RetryWithDelay(2, 1, TimeUnit.SECONDS, testScheduler))
.subscribeOn(Schedulers.trampoline())
.subscribe(testObserver);
// Assert
testObserver.assertSubscribed();
testObserver.assertNoValues();
testObserver.assertNoErrors();
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
testObserver.assertValueCount(0);
Assert.assertEquals(testObserver.errorCount(), 1);
testObserver.assertError(IOException.class);
testObserver.assertNotComplete();
}
@Test
public void retry_shouldRetryOneTimeAndSucceed() {
// Assign
TestObserver<String> testObserver = new TestObserver<>();
TestScheduler testScheduler = new TestScheduler();
// Act
createObservable()
.subscribeOn(testScheduler)
.retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS, testScheduler))
.subscribe(testObserver);
// Assert
testObserver.assertSubscribed();
testObserver.assertNoValues();
testObserver.assertNoErrors();
testScheduler.advanceTimeTo(1, TimeUnit.SECONDS);
testObserver.assertNoValues();
testObserver.assertNoErrors();
returnCorrectAnswer = true;
testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertNoErrors();
}
@Test
public void retry_shouldRetryTwoTimesAndSucceed() {
// Assign
TestObserver<String> testObserver = new TestObserver<>();
TestScheduler testScheduler = new TestScheduler();
// Act
createObservable()
.subscribeOn(testScheduler)
.retryWhen(new RetryWithDelay(2, 2, TimeUnit.SECONDS, testScheduler))
.subscribe(testObserver);
// Assert
testObserver.assertSubscribed();
testObserver.assertNoValues();
testObserver.assertNoErrors();
testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
testObserver.assertNoValues();
testObserver.assertNoErrors();
returnCorrectAnswer = true;
testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertNoErrors();
}
@Test
public void retry_shouldNotRetryAndShouldFailOnDifferentExceptions() {
// Assign
TestObserver testObserver = new TestObserver();
TestScheduler testScheduler = new TestScheduler();
// Act
Observable.create(e -> e.onError(new RuntimeException()))
.subscribeOn(testScheduler)
.retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS))
.subscribe(testObserver);
// Assert
testObserver.assertSubscribed();
testObserver.assertNoValues();
testObserver.assertNoErrors();
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); // Retry is after 2s, so no retry is called
testObserver.assertValueCount(0);
Assert.assertEquals(1, testObserver.errorCount());
testObserver.assertError(RuntimeException.class);
testObserver.assertNotComplete();
}
// Helper
private Observable<String> createObservable() {
return Observable.create(e -> {
System.out.println(returnCorrectAnswer);
if (returnCorrectAnswer)
e.onNext("Correct Answer");
else
e.onError(new IOException());
});
}
}
Screw this, use Coroutine and good-old while loop :D
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You shouldn't check if there is Connection on phone using BroadcastReceivable in this Class! If there is Network error, just Retry. Otherwise send to onError(). Also you should write another Helper for Network Connectivity of Phone with BroadcastReceivable which should notify User globally across the App.