Skip to content

Instantly share code, notes, and snippets.

@snodnipper
Created May 22, 2015 17:24
Show Gist options
  • Save snodnipper/8531cd918e5f23101526 to your computer and use it in GitHub Desktop.
Save snodnipper/8531cd918e5f23101526 to your computer and use it in GitHub Desktop.
RXJava MergeWith demo
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.concurrent.TimeUnit;
public class RxJavaMergeWith {
public static void main(String[] args) throws InterruptedException {
mergeWith();
Thread.sleep(60000);
}
/**
* mergeWith
*
* 0 seconds. Next: 1 from observable 1
* 2 seconds. Next: 2 from observable 1
* 4 seconds. Next: 3 from observable 1
* 6 seconds. Next: 4 from observable 1
* 8 seconds. Next: 5 from observable 1
* 10 seconds. Next: 1 from observable 2
* 15 seconds. Next: 2 from observable 2
* 20 seconds. Next: 3 from observable 2
* completed...
*/
private static void mergeWith() {
Observable<String> i1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 5; i++) {
subscriber.onNext(i + " from observable 1");
sleep(2000);
}
subscriber.onCompleted();
}
});
Observable<String> i2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 3; i++) {
subscriber.onNext(i + " from observable 2");
sleep(5000);
}
subscriber.onCompleted();
}
});
long then = System.currentTimeMillis();
i1.mergeWith(i2).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed...");
}
@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
}
@Override
public void onNext(String value) {
long now = System.currentTimeMillis();
long totalTime = (now - then) / 1000;
System.out.println(totalTime + " seconds. Next: " + value);
}
});
}
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment