Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active January 13, 2021 09:25
Show Gist options
  • Save jemshit/5b913e98c876863205b2de57b581b6f5 to your computer and use it in GitHub Desktop.
Save jemshit/5b913e98c876863205b2de57b581b6f5 to your computer and use it in GitHub Desktop.
RxJava Retry Web Service Connection incrementally if there is Network Kind of Error.
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> {
private int numberOfTry;
private int delay;
private int retryCount = 1;
private final TimeUnit timeUnit;
public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) {
this.numberOfTry = numberOfTry;
this.delay = delay;
this.timeUnit = timeUnit;
}
@Override
public Observable<?> call(Observable<? extends Throwable> errors) {
return errors
.flatMap(error -> {
if(error instanceof IOException) { // Network Error
int delayFinal = delay * retryCount;
retryCount++;
if (retryCount <= numberOfTry+1) {
// You might want to notify UI with EventBus etc... with Retry duration
Log.d("RetryWithDelay"+" call()", "retryCount " + delayFinal);
return Observable.timer((long) delayFinal, timeUnit);
} else {
return Observable.error(error); // Let them go to onError() after few retries
}
}else{ // Http Error or other Unexpected Error
return Observable.error(error); // Let them go to onError()
}
});
}
}
@jemshit
Copy link
Author

jemshit commented Aug 21, 2016

Usage:

.retryWhen(new RetryWithDelay(2, 5, TimeUnit.SECONDS))
.subscribe()

@jemshit
Copy link
Author

jemshit commented Nov 9, 2017

RxJava2:

public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {
    private final int numberOfTry;
    private final int delay;
    private final TimeUnit timeUnit;
    private final Scheduler timerScheduler; // To provide different Scheduler for testing purpose

    public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) {
        this.numberOfTry = numberOfTry;
        this.timeUnit = timeUnit;
        this.delay = delay;
        this.timerScheduler = Schedulers.computation();
    }

    public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit, Scheduler timerScheduler) {
        this.numberOfTry = numberOfTry;
        this.timeUnit = timeUnit;
        this.delay = delay;
        this.timerScheduler = timerScheduler;
    }

    @Override
    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
        return throwableObservable
                .zipWith(Observable.range(1, numberOfTry + 1), (throwable, counter) -> { // +1 for error propagation after last retry
                    if (throwable instanceof IOException) { // If Network Error
                        if (counter < numberOfTry + 1) {
                            System.out.println("Retrying after " + counter * delay + " time unit");
                            return Observable.timer((long) counter * delay, timeUnit, timerScheduler);
                        } else
                            return Observable.error(throwable);
                    } else
                        return Observable.error(throwable);
                })
                .flatMap(x -> x);
    }
}

@jemshit
Copy link
Author

jemshit commented Nov 9, 2017

Test

public class RetryWithDelayTest {

    private boolean returnCorrectAnswer;

    @Before
    public void setUp() throws Exception {
        returnCorrectAnswer = false;
    }

    @Before
    public void tearDown() throws Exception {
        returnCorrectAnswer = false;
    }


    @Test
    public void retry_shouldRetryOneTimeAndFail() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 1, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        testObserver.assertValueCount(0);
        Assert.assertEquals(testObserver.errorCount(), 1);
        testObserver.assertError(IOException.class);
        testObserver.assertNotComplete();
    }

    @Test
    public void retry_shouldRetryTwoTimesAndFail() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .retryWhen(new RetryWithDelay(2, 1, TimeUnit.SECONDS, testScheduler))
                .subscribeOn(Schedulers.trampoline())
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
        testObserver.assertValueCount(0);
        Assert.assertEquals(testObserver.errorCount(), 1);
        testObserver.assertError(IOException.class);
        testObserver.assertNotComplete();
    }

    @Test
    public void retry_shouldRetryOneTimeAndSucceed() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeTo(1, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        returnCorrectAnswer = true;
        testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
        testObserver.assertValueCount(1);
        testObserver.assertNoErrors();
    }

    @Test
    public void retry_shouldRetryTwoTimesAndSucceed() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(2, 2, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        returnCorrectAnswer = true;
        testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
        testObserver.assertValueCount(1);
        testObserver.assertNoErrors();
    }

    @Test
    public void retry_shouldNotRetryAndShouldFailOnDifferentExceptions() {
        // Assign
        TestObserver testObserver = new TestObserver();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        Observable.create(e -> e.onError(new RuntimeException()))
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); // Retry is after 2s, so no retry is called
        testObserver.assertValueCount(0);
        Assert.assertEquals(1, testObserver.errorCount());
        testObserver.assertError(RuntimeException.class);
        testObserver.assertNotComplete();
    }

    // Helper
    private Observable<String> createObservable() {
        return Observable.create(e -> {
            System.out.println(returnCorrectAnswer);

            if (returnCorrectAnswer)
                e.onNext("Correct Answer");
            else
                e.onError(new IOException());
        });
    }
}

@jemshit
Copy link
Author

jemshit commented Jan 13, 2021

Screw this, use Coroutine and good-old while loop :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment