Skip to content

Instantly share code, notes, and snippets.

@tynn
Last active December 29, 2016 11:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tynn/74da1dfa02f0dbe7497d53482f1f6ab7 to your computer and use it in GitHub Desktop.
Save tynn/74da1dfa02f0dbe7497d53482f1f6ab7 to your computer and use it in GitHub Desktop.
Rx challenge
apply plugin: 'java'
repositories {
jcenter()
}
sourceSets {
main.java {
srcDir "$projectDir"
include 'ObservableToggleInit.java'
}
test.java {
srcDir "$projectDir"
include 'ObservableToggle.java'
}
}
dependencies {
compile 'io.reactivex:rxjava:1.2.2'
testCompile 'junit:junit:4.12'
}
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* <p>
* The goal of this challenge is to create an {@link rx.Observable} which emits values only if
* another {@link rx.Observable} emitted {@code true} last.
* </p><p>
* For this create a class {@code ObservableToggleInit} extending {@link rx.Observable.Transformer}
* with a constructor {@code ObservableToggleInit#ObservableToggleInit(Observable)}.
* </p><p>
* The challenge is solved when all tests are green. Please comment on why a particular approach
* was used and why alternatives were less feasible.
* </p>
*/
public class ObservableToggle {
TestSubscriber<Integer> subscriber;
PublishSubject<Boolean> onActivate;
ReplaySubject<Integer> onNext;
Observable<Integer> observable;
@Before
public void setup() {
subscriber = TestSubscriber.create();
onActivate = PublishSubject.create();
onNext = ReplaySubject.create();
observable = onNext.compose(new ObservableToggleInit(onActivate.asObservable()));
observable.subscribe(subscriber);
}
@Test
public void completed_onActivate_active() {
onActivate.onNext(true);
onNext.onNext(1);
onActivate.onNext(false);
onNext.onNext(2);
onActivate.onNext(true);
onActivate.onCompleted();
onNext.onNext(3);
assertFalse(onActivate.hasObservers());
assertTrue(onNext.hasObservers());
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(1, 3);
subscriber.unsubscribe();
assertFalse(onNext.hasObservers());
subscriber.assertUnsubscribed();
}
@Test
public void completed_onActivate_inactive() {
onActivate.onNext(true);
onActivate.onNext(false);
onNext.onNext(1);
onNext.onNext(2);
onActivate.onCompleted();
onNext.onNext(3);
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertNoValues();
}
@Test
public void completed_onNext_active() throws Exception {
onActivate.onNext(true);
onNext.onCompleted();
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertNoValues();
}
@Test
public void completed_onNext_completed_onActivate_active() throws Exception {
onActivate.onNext(true);
onNext.onNext(1);
onActivate.onNext(false);
onNext.onNext(2);
onActivate.onNext(true);
onActivate.onCompleted();
onNext.onNext(3);
onNext.onCompleted();
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValues(1, 3);
}
@Test
public void completed_onNext_inactive() throws Exception {
onNext.onCompleted();
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertNoValues();
}
@Test
public void error_onActivate_active() throws Exception {
Throwable throwable = new Throwable("error");
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
onActivate.onError(throwable);
subscriber.assertError(throwable);
subscriber.assertNotCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(1, 2);
}
@Test
public void error_onActivate_inactive() throws Exception {
Throwable throwable = new Throwable("error");
onNext.onNext(1);
onNext.onNext(2);
onActivate.onError(throwable);
subscriber.assertError(throwable);
subscriber.assertNotCompleted();
subscriber.assertNotCompleted();
subscriber.assertNoValues();
}
@Test
public void error_onNext_active() throws Exception {
Throwable throwable = new Throwable("error");
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
onNext.onError(throwable);
subscriber.assertError(throwable);
subscriber.assertNotCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(1, 2);
}
@Test
public void error_onNext_inactive() throws Exception {
Throwable throwable = new Throwable("error");
onNext.onNext(1);
onNext.onNext(2);
onNext.onError(throwable);
subscriber.assertError(throwable);
subscriber.assertNotCompleted();
subscriber.assertNoValues();
}
@Test
public void sequence_active() throws Exception {
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
onNext.onNext(3);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(3);
subscriber.assertValues(1, 2, 3);
}
@Test
public void sequence_inactive() throws Exception {
onNext.onNext(1);
onNext.onNext(2);
onNext.onNext(3);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertNoValues();
}
@Test
public void sequence_multiple_active() throws Exception {
onNext.onNext(1);
onActivate.onNext(true);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
onActivate.onNext(true);
onNext.onNext(4);
onActivate.onNext(false);
onNext.onNext(5);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(2, 4);
}
@Test
public void sequence_single_active() throws Exception {
onNext.onNext(1);
onActivate.onNext(true);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(1);
subscriber.assertValue(2);
}
@Test
public void single_active() throws Exception {
onActivate.onNext(true);
onNext.onNext(1);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(1);
subscriber.assertValue(1);
}
@Test
public void single_inactive() throws Exception {
onNext.onNext(1);
onNext.onCompleted();
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertNoValues();
}
@Test
public void stream_active() throws Exception {
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
onNext.onNext(3);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValueCount(3);
subscriber.assertValues(1, 2, 3);
}
@Test
public void stream_inactive() throws Exception {
onNext.onNext(1);
onNext.onNext(2);
onNext.onNext(3);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertNoValues();
}
@Test
public void stream_multiple_active() throws Exception {
onNext.onNext(1);
onActivate.onNext(true);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
onActivate.onNext(true);
onNext.onNext(4);
onActivate.onNext(false);
onNext.onNext(5);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(2, 4);
}
@Test
public void stream_single_active() throws Exception {
onNext.onNext(1);
onActivate.onNext(true);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValueCount(1);
subscriber.assertValue(2);
}
@Test
public void unsubscribe_active() throws Exception {
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
subscriber.unsubscribe();
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertUnsubscribed();
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValueCount(2);
subscriber.assertValues(1, 2);
}
@Test
public void unsubscribe_inactive() throws Exception {
onNext.onNext(1);
onNext.onNext(2);
subscriber.unsubscribe();
assertFalse(onActivate.hasObservers());
assertFalse(onNext.hasObservers());
subscriber.assertUnsubscribed();
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertNoValues();
}
@Test
public void usable_with_multiple_subscribers_active() throws Exception {
TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
observable.subscribe(subscriber2);
onActivate.onNext(true);
onNext.onNext(1);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
subscriber2.unsubscribe();
onActivate.onNext(true);
onNext.onNext(4);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValues(1, 2, 4);
subscriber2.assertUnsubscribed();
subscriber2.assertNoErrors();
subscriber2.assertNotCompleted();
subscriber2.assertValues(1, 2);
}
@Test
public void usable_with_multiple_subscribers_inactive() throws Exception {
TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
onActivate.onNext(true);
observable.subscribe(subscriber2);
onNext.onNext(1);
onNext.onNext(2);
onActivate.onNext(false);
onNext.onNext(3);
subscriber2.unsubscribe();
onActivate.onNext(true);
onNext.onNext(4);
subscriber.assertNoErrors();
subscriber.assertNotCompleted();
subscriber.assertValues(1, 2, 4);
subscriber2.assertUnsubscribed();
subscriber2.assertNoErrors();
subscriber2.assertNotCompleted();
subscriber2.assertNoValues();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment