Skip to content

Instantly share code, notes, and snippets.

@schauder
Created April 15, 2017 09:17
Show Gist options
  • Save schauder/7f14d898ef00b85e49d92ecbbf78cd7a to your computer and use it in GitHub Desktop.
Save schauder/7f14d898ef00b85e49d92ecbbf78cd7a to your computer and use it in GitHub Desktop.
A couple of example tests about Multithreading with Reactor
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.time.Duration;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Jens Schauder
*/
public class BlogTest {
static Flux<String> addThread(Flux<?> flux) {
return flux.map(e -> e + " " + Thread.currentThread());
}
static <T> Flux<T> assertThread(Flux<T> flux, String name) {
return flux.doOnNext(
e -> assertThat(Thread.currentThread().getName(),
startsWith(name))
);
}
private final String threadName = Thread.currentThread().getName();
@Test
public void reactorIsSingleThreadedByDefault() {
Flux<Integer> flux = Flux.range(0, 1000);
assertThread(flux, threadName)
.blockLast(Duration.ofSeconds(1));
}
@Test
public void delayingElementsIntroducesThreads() {
Flux<Integer> flux = Flux.range(0, 1000)
.delayElements(Duration.ofMillis(1));
assertThread(flux, "timer")
.blockLast(Duration.ofSeconds(3));
}
@Test
public void publishOn() {
Flux<Integer> flux = Flux.range(0, 1000)
.publishOn(Schedulers.newSingle("newThread"));
assertThread(flux, "newThread")
.blockLast(Duration.ofSeconds(1));
}
@Test
public void subscribeOn() {
Flux<Integer> flux = Flux.range(0, 1000)
.subscribeOn(Schedulers.newSingle("newThread"));
assertThread(flux, "newThread")
.blockLast(Duration.ofSeconds(1));
}
@Test
public void publishOnTwice() {
Flux<Integer> flux = Flux.range(0, 1000);
Flux<Integer> fluxOnOne = assertThread(flux.publishOn(Schedulers.newSingle("one")), "one");
Flux<Integer> fluxOnOneOnTwo = assertThread(fluxOnOne.publishOn(Schedulers.newSingle("two")), "two");
fluxOnOneOnTwo.blockLast(Duration.ofSeconds(1));
}
@Test
public void subscribeOnTwice() {
Flux<Integer> flux = Flux.range(0, 1000);
Flux<Integer> fluxOnOne = assertThread(flux.subscribeOn(Schedulers.newSingle("one")), "one");
Flux<Integer> fluxOnOneOnTwo = assertThread(fluxOnOne.subscribeOn(Schedulers.newSingle("two")), "one");
fluxOnOneOnTwo.blockLast(Duration.ofSeconds(1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment