Skip to content

Instantly share code, notes, and snippets.

@praseodym
Created August 9, 2016 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save praseodym/3a87ea5e98ee1ab1bbcbc6c9ec6f0b85 to your computer and use it in GitHub Desktop.
Save praseodym/3a87ea5e98ee1ab1bbcbc6c9ec6f0b85 to your computer and use it in GitHub Desktop.
package reactor.core;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.TestSubscriber;
public class ConnectableFluxPublishErrorTest {
Logger logger = LoggerFactory.getLogger(ConnectableFluxPublishErrorTest.class);
private Flux<Integer> flux() {
return Flux.range(1, 5).log();
}
private void exceptionTest(int i) {
if (i == 3) {
logger.info("Throwing exception");
throw new RuntimeException("test");
}
}
@Test(expected = Exceptions.BubblingException.class)
public void testDefaultRegular() {
ConnectableFlux<Integer> p = flux().publish();
p.subscribe(i -> logger.info("s1: {}", i));
p.doOnNext(this::exceptionTest).subscribe(i -> logger.info("s2: {}", i));
p.connect();
}
@Test(expected = Exceptions.BubblingException.class)
public void testElasticRegular() {
ConnectableFlux<Integer> p = flux().publishOn(Schedulers.elastic()).publish();
p.subscribe(i -> logger.info("s1: {}", i));
p.doOnNext(this::exceptionTest).subscribe(i -> logger.info("s2: {}", i));
p.connect();
// This test does not throw an error!
}
@Test
public void testFluxElasticTestSubscriber() {
TestSubscriber<Integer> ts1 = TestSubscriber.create();
TestSubscriber<Integer> ts2 = TestSubscriber.create();
ConnectableFlux<Integer> p = flux().publishOn(Schedulers.elastic()).publish();
p.subscribe(ts1);
p.doOnNext(this::exceptionTest).subscribe(ts2);
p.connect();
ts1.assertNoError().assertNotComplete();
ts2.assertError().assertNotComplete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment