Skip to content

Instantly share code, notes, and snippets.

@Dorus
Last active April 7, 2016 23:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dorus/71c8f954e9e9e13dc0183f0197279408 to your computer and use it in GitHub Desktop.
Save Dorus/71c8f954e9e9e13dc0183f0197279408 to your computer and use it in GitHub Desktop.
package RxTest.RxTest;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Notification;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class delaySub {
private long delay = 0;
private Observable<Long> getSubDelayObs() {
return Observable.just(delay).delaySubscription(() -> getDelayObs());
}
Long start = System.currentTimeMillis();
private Observable<Long> getDelayObs() {
final long del = delay++;
return Observable.timer(delay * del, TimeUnit.SECONDS)
.doOnSubscribe(() -> System.out.println("starting " + del + " " + (System.currentTimeMillis() - start)))
.doOnCompleted(() -> System.out.println("emit " + del + " " + (System.currentTimeMillis() - start)));
}
public Observable<String> m1() {
return Observable
.mergeDelayError(
getSubDelayObs().doOnEach(printAction("1a")).flatMap(getResult()).doOnEach(printAction1("1b")).retry(),
getSubDelayObs().doOnEach(printAction("2a")).flatMap(getResult()).doOnEach(printAction1("2b")).retry(),
getSubDelayObs().doOnEach(printAction("3a")).flatMap(getResult()).doOnEach(printAction1("3b")).retry())
.doOnEach(printAction1("4"));
}
private Action1<Notification<? super Long>> printAction(String name) {
return action -> {
if (action.isOnCompleted()) {
System.out.println(name + " Completed");
}
if (action.isOnError()) {
System.out.println(name + " Error " + action.getThrowable().getMessage());
}
if (action.isOnNext()) {
System.out.println(name + " Next" + action.getValue());
}
};
}
private Action1<Notification<? super String>> printAction1(String name) {
return action -> {
if (action.isOnCompleted()) {
System.out.println(name + " Completed");
}
if (action.isOnError()) {
System.out.println(name + " Error " + action.getThrowable().getMessage());
}
if (action.isOnNext()) {
System.out.println(name + " Next" + action.getValue());
}
};
}
private int count = 0;
private Func1<Long, Observable<String>> getResult() {
return e -> {
int cur = count++;
switch (cur) {
case 0:
case 1:
case 2:
return Observable.error(new Exception("Something wrong!"));
case 3:
return Observable.just("Sucess").delay(1, TimeUnit.SECONDS);
default:
return Observable.just("Done!");
}
};
}
public static void main(String[] args) throws IOException {
new delaySub().m1().take(1).subscribe(e -> System.out.println(e), err -> System.out.println(err.getMessage()),
() -> System.out.println("Completed"));
System.in.read();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment