Created
October 15, 2017 21:17
-
-
Save franciscojunior/981e0203035ecfd1dfcdab7a72de3f7e to your computer and use it in GitHub Desktop.
RxJava test samples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PlaygroundTests { | |
// Template test using Observables | |
// Reference: http://docs.couchbase.com/developer/java-2.0/observables.html | |
@Test | |
public void test_subscribe0() throws InterruptedException { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicReference<String> subscriberThreadName = new AtomicReference<>(); | |
Observable | |
.just(getValue()) | |
.doOnSubscribe(new Consumer<Disposable>() { | |
@Override | |
public void accept(Disposable disposable) throws Exception { | |
onSubscribeThreadName.set(Thread.currentThread().getName()); | |
} | |
}) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<Integer>() { | |
@Override | |
public void accept(Integer integer) throws Exception { | |
subscriberThreadName.set(Thread.currentThread().getName()); | |
received.set(integer); | |
latch.countDown(); | |
} | |
}); | |
latch.await(); | |
System.out.println(onSubscribeThreadName.get()); | |
System.out.println(subscriberThreadName.get()); | |
} | |
@Test | |
public void test_subscribe1() throws InterruptedException { | |
int rounds = 1; | |
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicReference<String> subscriberThreadName = new AtomicReference<>(); | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < rounds; i++) { | |
final CountDownLatch latch = new CountDownLatch(1); | |
Disposable subscription = Observable | |
.just(1) | |
.doOnSubscribe(new Consumer<Disposable>() { | |
@Override | |
public void accept(Disposable disposable) throws Exception { | |
onSubscribeThreadName.set(Thread.currentThread().getName()); | |
} | |
}) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<Integer>() { | |
@Override | |
public void accept(Integer integer) throws Exception { | |
subscriberThreadName.set(Thread.currentThread().getName()); | |
received.set(integer); | |
latch.countDown(); | |
} | |
}); | |
latch.await(); | |
subscription.dispose(); | |
} | |
System.out.println(onSubscribeThreadName.get()); | |
System.out.println(subscriberThreadName.get()); | |
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start)); | |
} | |
@Test | |
public void test_subscribe2() throws InterruptedException { | |
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicReference<String> subscriberThreadName = new AtomicReference<>(); | |
int rounds = 1; | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < rounds; i++) { | |
final CountDownLatch latch = new CountDownLatch(1); | |
Disposable subscription = Observable.create(new ObservableOnSubscribe<Integer>() { | |
@Override | |
public void subscribe(ObservableEmitter<Integer> e) throws Exception { | |
e.onNext(1); | |
e.onComplete(); | |
} | |
}) | |
.doOnSubscribe(new Consumer<Disposable>() { | |
@Override | |
public void accept(Disposable disposable) throws Exception { | |
onSubscribeThreadName.set(Thread.currentThread().getName()); | |
} | |
}) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<Integer>() { | |
@Override | |
public void accept(Integer integer) throws Exception { | |
subscriberThreadName.set(Thread.currentThread().getName()); | |
received.set(integer); | |
latch.countDown(); | |
} | |
}); | |
latch.await(); | |
subscription.dispose(); | |
} | |
System.out.println(onSubscribeThreadName.get()); | |
System.out.println(subscriberThreadName.get()); | |
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start)); | |
} | |
@Test | |
public void test_subscribe3() throws InterruptedException { | |
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicReference<String> subscriberThreadName = new AtomicReference<>(); | |
int rounds = 1; | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < rounds; i++) { | |
final CountDownLatch latch = new CountDownLatch(1); | |
Disposable subscription = Observable | |
.fromCallable(new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
return 1; | |
} | |
}) | |
.doOnSubscribe(new Consumer<Disposable>() { | |
@Override | |
public void accept(Disposable disposable) throws Exception { | |
onSubscribeThreadName.set(Thread.currentThread().getName()); | |
} | |
}) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<Integer>() { | |
@Override | |
public void accept(Integer integer) throws Exception { | |
subscriberThreadName.set(Thread.currentThread().getName()); | |
received.set(integer); | |
latch.countDown(); | |
} | |
}); | |
latch.await(); | |
subscription.dispose(); | |
} | |
System.out.println(onSubscribeThreadName.get()); | |
System.out.println(subscriberThreadName.get()); | |
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start)); | |
} | |
private int getValue() { | |
System.out.println(Thread.currentThread().getName()); | |
return 1; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment