Skip to content

Instantly share code, notes, and snippets.

@abliss
Created March 20, 2013 14:58
Show Gist options
  • Save abliss/5205333 to your computer and use it in GitHub Desktop.
Save abliss/5205333 to your computer and use it in GitHub Desktop.
A failing test.
diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
index c621002..b7c9fac 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.lang.reflect.Array;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -193,8 +194,8 @@ public final class OperationConcat {
public void testConcatUnsubscribe() {
final CountDownLatch callOnce = new CountDownLatch(1);
final CountDownLatch okToContinue = new CountDownLatch(1);
- final TestObservable w1 = new TestObservable(null, null, "one", "two", "three");
- final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six");
+ final TestObservable<String> w1 = new TestObservable<String>(null, null, "one", "two", "three");
+ final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");
@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
@@ -256,7 +257,40 @@ public final class OperationConcat {
Assert.assertEquals(expected.length, index);
}
- private static class TestObservable extends Observable<String> {
+ @Test
+ public void testBlockedObservableOfObservables() {
+ final String[] o = { "1", "3", "5", "7" };
+ final String[] e = { "2", "4", "6" };
+ final Observable<String> odds = Observable.toObservable(o);
+ final Observable<String> even = Observable.toObservable(e);
+ final CountDownLatch callOnce = new CountDownLatch(1);
+ final CountDownLatch okToContinue = new CountDownLatch(1);
+ TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
+ Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
+ Observable<String> concat = Observable.create(concatF);
+ concat.subscribe(observer);
+ try {
+ //Block main thread to allow observables to serve up o1.
+ callOnce.await();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ fail(ex.getMessage());
+ }
+ // The concated observable should have served up all of the odds.
+ Assert.assertEquals(o.length, index);
+ try {
+ // unblock observables so it can serve up o2 and complete
+ okToContinue.countDown();
+ observableOfObservables.t.join();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ fail(ex.getMessage());
+ }
+ // The concatenated observable should now have served up all the evens.
+ Assert.assertEquals(o.length, index);
+ }
+
+ private static class TestObservable<T> extends Observable<T> {
private final Subscription s = new Subscription() {
@@ -266,28 +300,28 @@ public final class OperationConcat {
}
};
- private final String[] values;
+ private final List<T> values;
private Thread t = null;
private int count = 0;
private boolean subscribed = true;
private final CountDownLatch once;
private final CountDownLatch okToContinue;
- public TestObservable(CountDownLatch once, CountDownLatch okToContinue, String... values) {
- this.values = values;
+ public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
+ this.values = Arrays.asList(values);
this.once = once;
this.okToContinue = okToContinue;
}
@Override
- public Subscription subscribe(final Observer<String> observer) {
+ public Subscription subscribe(final Observer<T> observer) {
t = new Thread(new Runnable() {
@Override
public void run() {
try {
- while (count < values.length && subscribed) {
- observer.onNext(values[count]);
+ while (count < values.size() && subscribed) {
+ observer.onNext(values.get(count));
count++;
//Unblock the main thread to call unsubscribe.
if (null != once)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment