Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Created May 20, 2014 01:35
Show Gist options
  • Save NiteshKant/0d2800327e47b61e01bd to your computer and use it in GitHub Desktop.
Save NiteshKant/0d2800327e47b61e01bd to your computer and use it in GitHub Desktop.
TestingTakeUntilAndRetry
package io.reactivex.netty.examples.java;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subscriptions.SerialSubscription;
import java.util.concurrent.TimeUnit;
/**
* @author Nitesh Kant
*/
public class TestMe {
public static void main(String[] args) {
Observable.interval(1, TimeUnit.SECONDS)
.lift(new Observable.Operator<Long, Long>() {
@Override
public Subscriber<? super Long> call(final Subscriber<? super Long> child) {
final SerialSubscription serialSubscription = new SerialSubscription();
// add serialSubscription so it gets unsubscribed if child is unsubscribed
child.add(serialSubscription);
return new RetryGuy(child, serialSubscription);
}
}).takeUntil(Observable.interval(20, TimeUnit.SECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
System.out.println("TestMe.call");
return aLong;
}
}))
.toBlockingObservable()
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
}
private static class RetryGuy extends Subscriber<Long> {
private final Subscriber<? super Long> child;
private final SerialSubscription serialSubscription;
public RetryGuy(Subscriber<? super Long> child, SerialSubscription serialSubscription) {
this.child = child;
this.serialSubscription = serialSubscription;
}
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Long integer) {
if (integer == 5) {
System.out.println("Retrying to different source!!!");
RetryGuy retryGuy = new RetryGuy(child, serialSubscription);
serialSubscription.set(retryGuy);
Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return 200 + aLong;
}
}).subscribe(retryGuy);
} else {
child.onNext(integer);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment