Skip to content

Instantly share code, notes, and snippets.

@samhendley
Created July 19, 2013 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samhendley/6042034 to your computer and use it in GitHub Desktop.
Save samhendley/6042034 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
public class SettableObservable<T> {
private final Observer<T> observer;
private final Observable<T> observable;
private boolean successCalled = false;
private boolean exceptionCalled = false;
private SettableObservable(Observer<T> observer, Observable<T> observable) {
assert observable != null;
assert observer != null;
this.observer = observer;
this.observable = observable;
}
public Observable<T> getObservable() {
return observable;
}
public void onSuccess(T args) {
assert !successCalled;
successCalled = true;
observer.onNext(args);
observer.onCompleted();
}
public void onError(Exception e) {
assert !exceptionCalled;
exceptionCalled = true;
observer.onError(e);
}
private static class ObserverGrabber<T> implements Func1<Observer<T>, Subscription> {
private Observer<T> observer;
@Override
public Subscription call(final Observer<T> observer) {
assert this.observer == null;
this.observer = observer;
return Subscriptions.empty();
}
private Observer<T> getObserver() {
assert observer != null;
return observer;
}
}
public static <T> SettableObservable<T> create() {
ObserverGrabber<T> grabber = new ObserverGrabber<T>();
ConnectableObservable<T> connected = Observable.create(grabber).replay();
connected.connect();
return new SettableObservable<T>(grabber.getObserver(), connected);
}
}
import org.junit.Test;
import rx.util.functions.Action1;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class SettableObservableTest {
@Test
public void testSuccess() throws Exception {
SettableObservable<Integer> intFuture = SettableObservable.create();
final AtomicInteger beforePublishSubscription = new AtomicInteger(0);
final AtomicInteger afterPublishSubscription = new AtomicInteger(0);
intFuture.getObservable().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
beforePublishSubscription.set(integer);
}
});
intFuture.onSuccess(243);
intFuture.getObservable().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
afterPublishSubscription.set(integer);
}
});
assertEquals(243, beforePublishSubscription.get());
assertEquals(243, afterPublishSubscription.get());
try {
intFuture.onSuccess(223432443);
fail("should only call onSuccessOnce");
} catch (AssertionError ae) {
// expected error
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment