Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created April 22, 2015 09:02
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save akarnokd/16e8e30d21e2ff5349e7 to your computer and use it in GitHub Desktop.
Save akarnokd/16e8e30d21e2ff5349e7 to your computer and use it in GitHub Desktop.
import java.util.*;
import rx.*;
import rx.Observable.Operator;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.subjects.*;
public class RxEventBus<T> {
private final Subject<Notification<T>, Notification<T>> subject;
private final Observable<T> observable;
public RxEventBus() {
subject = PublishSubject.<Notification<T>>create().toSerialized();
observable = subject.<T>dematerialize().lift(new ClientErrorBounceBack<T>());
}
public void post(T value) {
subject.onNext(Notification.createOnNext(value));
}
public void postError(Throwable e) {
subject.onNext(Notification.createOnError(e));
}
public Observable<T> observe() {
return observable;
}
/**
* This operator captures exceptions thrown by a downstream onNext so it doesn't disrupt
* the upstream PublishSubject.
* @param <T>
*/
static final class ClientErrorBounceBack<T> implements Operator<T, T> {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
Subscriber<T> result = new Subscriber<T>(child, false) {
boolean done;
@Override
public void onNext(T t) {
if (!done) {
try {
child.onNext(t);
} catch (Throwable e) {
onError(e);
return;
}
}
}
@Override
public void onError(Throwable e) {
if (!done) {
done = true;
try {
child.onError(e);
} finally {
unsubscribe();
}
}
}
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
child.onCompleted();
} finally {
unsubscribe();
}
}
}
};
child.add(result);
return result;
}
}
public static void main(String[] args) {
RxEventBus<Integer> eb = new RxEventBus<>();
TestSubscriber<Integer> ts1 = new TestSubscriber<>();
TestSubscriber<Integer> ts2 = new TestSubscriber<>();
eb.observe().subscribe(ts1);
eb.observe().subscribe(ts2);
eb.post(1);
eb.postError(new Exception());
// See if both received the value and the error
ts1.assertTerminalEvent();
ts1.assertReceivedOnNext(Arrays.asList(1));
if (ts1.getOnErrorEvents().isEmpty()) {
throw new AssertionError("No exception received?");
}
ts2.assertTerminalEvent();
ts2.assertReceivedOnNext(Arrays.asList(1));
if (ts2.getOnErrorEvents().isEmpty()) {
throw new AssertionError("No exception received?");
}
// see if further subscribers aren't affected by former errors post
TestSubscriber<Integer> ts3 = new TestSubscriber<>();
eb.observe().subscribe(ts3);
eb.post(2);
ts3.assertReceivedOnNext(Arrays.asList(2));
// see if a client error doesn't affect others
TestSubscriber<Integer> ts4 = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
throw new RuntimeException();
}
};
eb.observe().subscribe(ts4);
eb.post(3);
eb.post(4);
ts3.assertReceivedOnNext(Arrays.asList(2, 3, 4));
ts4.assertTerminalEvent();
ts4.assertReceivedOnNext(Collections.emptyList());
if (ts4.getOnErrorEvents().isEmpty()) {
throw new AssertionError("No errors received!");
}
System.out.println("Success!");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment