Skip to content

Instantly share code, notes, and snippets.

@franciscojunior
Created October 15, 2017 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save franciscojunior/981e0203035ecfd1dfcdab7a72de3f7e to your computer and use it in GitHub Desktop.
Save franciscojunior/981e0203035ecfd1dfcdab7a72de3f7e to your computer and use it in GitHub Desktop.
RxJava test samples
public class PlaygroundTests {
// Template test using Observables
// Reference: http://docs.couchbase.com/developer/java-2.0/observables.html
@Test
public void test_subscribe0() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
final AtomicInteger received = new AtomicInteger();
final AtomicReference<String> subscriberThreadName = new AtomicReference<>();
Observable
.just(getValue())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
onSubscribeThreadName.set(Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
latch.countDown();
}
});
latch.await();
System.out.println(onSubscribeThreadName.get());
System.out.println(subscriberThreadName.get());
}
@Test
public void test_subscribe1() throws InterruptedException {
int rounds = 1;
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
final AtomicInteger received = new AtomicInteger();
final AtomicReference<String> subscriberThreadName = new AtomicReference<>();
long start = System.currentTimeMillis();
for (int i = 0; i < rounds; i++) {
final CountDownLatch latch = new CountDownLatch(1);
Disposable subscription = Observable
.just(1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
onSubscribeThreadName.set(Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
latch.countDown();
}
});
latch.await();
subscription.dispose();
}
System.out.println(onSubscribeThreadName.get());
System.out.println(subscriberThreadName.get());
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start));
}
@Test
public void test_subscribe2() throws InterruptedException {
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
final AtomicInteger received = new AtomicInteger();
final AtomicReference<String> subscriberThreadName = new AtomicReference<>();
int rounds = 1;
long start = System.currentTimeMillis();
for (int i = 0; i < rounds; i++) {
final CountDownLatch latch = new CountDownLatch(1);
Disposable subscription = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onComplete();
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
onSubscribeThreadName.set(Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
latch.countDown();
}
});
latch.await();
subscription.dispose();
}
System.out.println(onSubscribeThreadName.get());
System.out.println(subscriberThreadName.get());
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start));
}
@Test
public void test_subscribe3() throws InterruptedException {
final AtomicReference<String> onSubscribeThreadName = new AtomicReference<>();
final AtomicInteger received = new AtomicInteger();
final AtomicReference<String> subscriberThreadName = new AtomicReference<>();
int rounds = 1;
long start = System.currentTimeMillis();
for (int i = 0; i < rounds; i++) {
final CountDownLatch latch = new CountDownLatch(1);
Disposable subscription = Observable
.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
onSubscribeThreadName.set(Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
subscriberThreadName.set(Thread.currentThread().getName());
received.set(integer);
latch.countDown();
}
});
latch.await();
subscription.dispose();
}
System.out.println(onSubscribeThreadName.get());
System.out.println(subscriberThreadName.get());
System.out.println(String.format("time spent (ms): %s", System.currentTimeMillis() - start));
}
private int getValue() {
System.out.println(Thread.currentThread().getName());
return 1;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment