Created
July 19, 2013 20:16
-
-
Save samhendley/6042034 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.observables.ConnectableObservable; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Func1; | |
public class SettableObservable<T> { | |
private final Observer<T> observer; | |
private final Observable<T> observable; | |
private boolean successCalled = false; | |
private boolean exceptionCalled = false; | |
private SettableObservable(Observer<T> observer, Observable<T> observable) { | |
assert observable != null; | |
assert observer != null; | |
this.observer = observer; | |
this.observable = observable; | |
} | |
public Observable<T> getObservable() { | |
return observable; | |
} | |
public void onSuccess(T args) { | |
assert !successCalled; | |
successCalled = true; | |
observer.onNext(args); | |
observer.onCompleted(); | |
} | |
public void onError(Exception e) { | |
assert !exceptionCalled; | |
exceptionCalled = true; | |
observer.onError(e); | |
} | |
private static class ObserverGrabber<T> implements Func1<Observer<T>, Subscription> { | |
private Observer<T> observer; | |
@Override | |
public Subscription call(final Observer<T> observer) { | |
assert this.observer == null; | |
this.observer = observer; | |
return Subscriptions.empty(); | |
} | |
private Observer<T> getObserver() { | |
assert observer != null; | |
return observer; | |
} | |
} | |
public static <T> SettableObservable<T> create() { | |
ObserverGrabber<T> grabber = new ObserverGrabber<T>(); | |
ConnectableObservable<T> connected = Observable.create(grabber).replay(); | |
connected.connect(); | |
return new SettableObservable<T>(grabber.getObserver(), connected); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.junit.Test; | |
import rx.util.functions.Action1; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.fail; | |
public class SettableObservableTest { | |
@Test | |
public void testSuccess() throws Exception { | |
SettableObservable<Integer> intFuture = SettableObservable.create(); | |
final AtomicInteger beforePublishSubscription = new AtomicInteger(0); | |
final AtomicInteger afterPublishSubscription = new AtomicInteger(0); | |
intFuture.getObservable().subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer integer) { | |
beforePublishSubscription.set(integer); | |
} | |
}); | |
intFuture.onSuccess(243); | |
intFuture.getObservable().subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer integer) { | |
afterPublishSubscription.set(integer); | |
} | |
}); | |
assertEquals(243, beforePublishSubscription.get()); | |
assertEquals(243, afterPublishSubscription.get()); | |
try { | |
intFuture.onSuccess(223432443); | |
fail("should only call onSuccessOnce"); | |
} catch (AssertionError ae) { | |
// expected error | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment