Skip to content

Instantly share code, notes, and snippets.

@ruslansennov
Last active April 20, 2018 05:34
Show Gist options
  • Save ruslansennov/9bb087b85df6f2ec349cdc3f6e40f4aa to your computer and use it in GitHub Desktop.
Save ruslansennov/9bb087b85df6f2ec349cdc3f6e40f4aa to your computer and use it in GitHub Desktop.
package example;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rxjava.core.RxHelper;
import io.vertx.rxjava.core.Vertx;
import io.vertx.test.core.VertxTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import rx.Notification;
import rx.Scheduler;
import rx.Single;
import rx.functions.Action1;
@RunWith(VertxUnitRunner.class)
public class SchedulersTest extends VertxTestBase {
private static Single.OnSubscribe<Void> onSubscribe() {
return f -> {
System.out.println("subscribed: " + Thread.currentThread());
f.onSuccess(null);
};
}
private static Action1<Notification<? extends Void>> echoThread() {
return o -> System.out.println("current: " + Thread.currentThread());
}
private Scheduler worker;
private Scheduler eventLoop;
@Before
public void before() throws Exception {
super.before();
Vertx rxVertx = new Vertx(vertx);
worker = RxHelper.blockingScheduler(rxVertx);
eventLoop = RxHelper.scheduler(rxVertx);
}
@Test
public void test1() {
Single.create(onSubscribe()) // Thread[main,5,main]
.test().awaitTerminalEvent();
}
@Test
public void test2() {
Single.create(onSubscribe()) // Thread[vert.x-worker-thread-0,5,main]
.subscribeOn(worker)
.test().awaitTerminalEvent();
}
@Test
public void test3() {
Single.create(onSubscribe()) // Thread[vert.x-eventloop-thread-0,5,main]
.subscribeOn(eventLoop)
.test().awaitTerminalEvent();
}
@Test
public void test4() {
Single.create(onSubscribe()) // Thread[main,5,main]
.observeOn(eventLoop)
.doOnEach(echoThread()) // Thread[vert.x-eventloop-thread-0,5,main]
.test().awaitTerminalEvent();
}
@Test
public void test5() {
Single.create(onSubscribe()) // Thread[vert.x-worker-thread-0,5,main]
.subscribeOn(worker)
.observeOn(eventLoop)
.doOnEach(echoThread()) // Thread[vert.x-eventloop-thread-0,5,main]
.test().awaitTerminalEvent();
}
@Test
public void test6() {
Single.create(onSubscribe()) // Thread[vert.x-worker-thread-0,5,main]
.subscribeOn(worker)
.observeOn(eventLoop)
.doOnEach(echoThread()) // Thread[vert.x-eventloop-thread-0,5,main]
.subscribeOn(worker) // does not change anything
.doOnEach(echoThread()) // Thread[vert.x-eventloop-thread-0,5,main]
.test().awaitTerminalEvent();
}
@Test
public void test7() {
Single.create(onSubscribe()) // Thread[vert.x-worker-thread-0,5,main]
.subscribeOn(worker)
.doOnEach(echoThread()) // Thread[vert.x-worker-thread-0,5,main]
.subscribeOn(eventLoop) // does not change anything
.doOnEach(echoThread()) // Thread[vert.x-worker-thread-0,5,main]
.test().awaitTerminalEvent();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment