Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active January 13, 2021 09:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jemshit/5b913e98c876863205b2de57b581b6f5 to your computer and use it in GitHub Desktop.
Save jemshit/5b913e98c876863205b2de57b581b6f5 to your computer and use it in GitHub Desktop.
RxJava Retry Web Service Connection incrementally if there is Network Kind of Error.
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> {
private int numberOfTry;
private int delay;
private int retryCount = 1;
private final TimeUnit timeUnit;
public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) {
this.numberOfTry = numberOfTry;
this.delay = delay;
this.timeUnit = timeUnit;
}
@Override
public Observable<?> call(Observable<? extends Throwable> errors) {
return errors
.flatMap(error -> {
if(error instanceof IOException) { // Network Error
int delayFinal = delay * retryCount;
retryCount++;
if (retryCount <= numberOfTry+1) {
// You might want to notify UI with EventBus etc... with Retry duration
Log.d("RetryWithDelay"+" call()", "retryCount " + delayFinal);
return Observable.timer((long) delayFinal, timeUnit);
} else {
return Observable.error(error); // Let them go to onError() after few retries
}
}else{ // Http Error or other Unexpected Error
return Observable.error(error); // Let them go to onError()
}
});
}
}
@jemshit
Copy link
Author

jemshit commented Aug 21, 2016

You shouldn't check if there is Connection on phone using BroadcastReceivable in this Class! If there is Network error, just Retry. Otherwise send to onError(). Also you should write another Helper for Network Connectivity of Phone with BroadcastReceivable which should notify User globally across the App.

@jemshit
Copy link
Author

jemshit commented Aug 21, 2016

Usage:

.retryWhen(new RetryWithDelay(2, 5, TimeUnit.SECONDS))
.subscribe()

@jemshit
Copy link
Author

jemshit commented Nov 9, 2017

RxJava2:

public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {
    private final int numberOfTry;
    private final int delay;
    private final TimeUnit timeUnit;
    private final Scheduler timerScheduler; // To provide different Scheduler for testing purpose

    public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit) {
        this.numberOfTry = numberOfTry;
        this.timeUnit = timeUnit;
        this.delay = delay;
        this.timerScheduler = Schedulers.computation();
    }

    public RetryWithDelay(int numberOfTry, int delay, TimeUnit timeUnit, Scheduler timerScheduler) {
        this.numberOfTry = numberOfTry;
        this.timeUnit = timeUnit;
        this.delay = delay;
        this.timerScheduler = timerScheduler;
    }

    @Override
    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
        return throwableObservable
                .zipWith(Observable.range(1, numberOfTry + 1), (throwable, counter) -> { // +1 for error propagation after last retry
                    if (throwable instanceof IOException) { // If Network Error
                        if (counter < numberOfTry + 1) {
                            System.out.println("Retrying after " + counter * delay + " time unit");
                            return Observable.timer((long) counter * delay, timeUnit, timerScheduler);
                        } else
                            return Observable.error(throwable);
                    } else
                        return Observable.error(throwable);
                })
                .flatMap(x -> x);
    }
}

@jemshit
Copy link
Author

jemshit commented Nov 9, 2017

Test

public class RetryWithDelayTest {

    private boolean returnCorrectAnswer;

    @Before
    public void setUp() throws Exception {
        returnCorrectAnswer = false;
    }

    @Before
    public void tearDown() throws Exception {
        returnCorrectAnswer = false;
    }


    @Test
    public void retry_shouldRetryOneTimeAndFail() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 1, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
        testObserver.assertValueCount(0);
        Assert.assertEquals(testObserver.errorCount(), 1);
        testObserver.assertError(IOException.class);
        testObserver.assertNotComplete();
    }

    @Test
    public void retry_shouldRetryTwoTimesAndFail() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .retryWhen(new RetryWithDelay(2, 1, TimeUnit.SECONDS, testScheduler))
                .subscribeOn(Schedulers.trampoline())
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
        testObserver.assertValueCount(0);
        Assert.assertEquals(testObserver.errorCount(), 1);
        testObserver.assertError(IOException.class);
        testObserver.assertNotComplete();
    }

    @Test
    public void retry_shouldRetryOneTimeAndSucceed() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeTo(1, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        returnCorrectAnswer = true;
        testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
        testObserver.assertValueCount(1);
        testObserver.assertNoErrors();
    }

    @Test
    public void retry_shouldRetryTwoTimesAndSucceed() {
        // Assign
        TestObserver<String> testObserver = new TestObserver<>();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        createObservable()
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(2, 2, TimeUnit.SECONDS, testScheduler))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeTo(2, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        returnCorrectAnswer = true;
        testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
        testObserver.assertValueCount(1);
        testObserver.assertNoErrors();
    }

    @Test
    public void retry_shouldNotRetryAndShouldFailOnDifferentExceptions() {
        // Assign
        TestObserver testObserver = new TestObserver();
        TestScheduler testScheduler = new TestScheduler();

        // Act
        Observable.create(e -> e.onError(new RuntimeException()))
                .subscribeOn(testScheduler)
                .retryWhen(new RetryWithDelay(1, 2, TimeUnit.SECONDS))
                .subscribe(testObserver);

        // Assert
        testObserver.assertSubscribed();
        testObserver.assertNoValues();
        testObserver.assertNoErrors();

        testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); // Retry is after 2s, so no retry is called
        testObserver.assertValueCount(0);
        Assert.assertEquals(1, testObserver.errorCount());
        testObserver.assertError(RuntimeException.class);
        testObserver.assertNotComplete();
    }

    // Helper
    private Observable<String> createObservable() {
        return Observable.create(e -> {
            System.out.println(returnCorrectAnswer);

            if (returnCorrectAnswer)
                e.onNext("Correct Answer");
            else
                e.onError(new IOException());
        });
    }
}

@jemshit
Copy link
Author

jemshit commented Jan 13, 2021

Screw this, use Coroutine and good-old while loop :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment