Skip to content

Instantly share code, notes, and snippets.

@YannRobert
Created March 30, 2015 13:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save YannRobert/a07ed294dbaccd4c3705 to your computer and use it in GitHub Desktop.
Save YannRobert/a07ed294dbaccd4c3705 to your computer and use it in GitHub Desktop.
Does the defensive code in RxJava "merge" operator could be improved?
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
/**
* either there is a issue in the defensive code in the <code>merge</code> operator or RxJava, either the support for mock Subject could be improved?
*/
@Slf4j
public class IssueInRxJavaDefensiveCodeTest {
@Test
public void shouldAcceptSubscriber() throws InterruptedException {
Subject<Integer, Integer> subject1 = PublishSubject.create();
CountDownLatch latch = new CountDownLatch(1);
subject1.subscribe(new SimpleLoggingObserver<>(latch));
subject1.onNext(1);
subject1.onCompleted();
assertTrue(latch.await(3, TimeUnit.SECONDS));
}
/**
* Here we get an IllegalStateException when subscribing to a mock Subject
*/
@Test
public void shouldThrowIllegalStateExceptionOnMockSubjectSubscribe() throws InterruptedException {
Subject<Integer, Integer> subject2 = mock(Subject.class, new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
log.info("Calling Method {} ...", invocationOnMock.getMethod().getName());
throw new UnsupportedOperationException(invocationOnMock.getMethod().getName());
}
});
doNothing().when(subject2).onNext(anyInt());
doNothing().when(subject2).onCompleted();
CountDownLatch latch = new CountDownLatch(1);
try {
subject2.subscribe(new SimpleLoggingObserver<>(latch));
fail();
} catch (IllegalStateException e) {
assertEquals("onSubscribe function can not be null.", e.getMessage());
}
subject2.onNext(1);
subject2.onCompleted();
assertFalse(latch.await(3, TimeUnit.SECONDS));
}
/**
* when subscribing to Subject merging a mock Subject, a NullPointerException occurs under the hood, and no error is reported to the Observer.
*/
@Test
public void shouldThrowIllegalStateExceptionOnMockSubjectSubscribeByMerge() throws InterruptedException {
Subject<Integer, Integer> subject1 = PublishSubject.create();
Subject<Integer, Integer> subject2 = mock(Subject.class, new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
log.info("Calling Method {} ...", invocationOnMock.getMethod().getName());
throw new UnsupportedOperationException(invocationOnMock.getMethod().getName());
}
});
doNothing().when(subject2).onNext(anyInt());
doNothing().when(subject2).onCompleted();
CountDownLatch latch = new CountDownLatch(1);
Observable.merge(subject1, subject2).subscribe(new SimpleLoggingObserver<>(latch));
subject1.onNext(1);
subject1.onCompleted();
subject2.onNext(1);
subject2.onCompleted();
assertFalse(latch.await(3, TimeUnit.SECONDS));
}
private static class SimpleLoggingObserver<T> implements Observer<T> {
private final CountDownLatch latch;
public SimpleLoggingObserver(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void onCompleted() {
log.info("Completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
log.info("Error", e);
latch.countDown();
}
@Override
public void onNext(T value) {
log.info("next = {}", value);
}
}
}
@YannRobert
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment