Skip to content

Instantly share code, notes, and snippets.

@imran0101
Created May 22, 2016 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imran0101/7b0a45c04c6500fd8b2c10c3cf8547c1 to your computer and use it in GitHub Desktop.
Save imran0101/7b0a45c04c6500fd8b2c10c3cf8547c1 to your computer and use it in GitHub Desktop.
EventBus with complete and error.
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
/**
* An object reference of EventBus
* courtesy https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
*
* @hide
*/
public class EventBus {
static class Completable {
Class clz;
public static Completable create(Class clz) {
Completable completable = new Completable();
completable.clz = clz;
return completable;
}
}
static class Error {
Throwable t;
Class clz;
public static Error create(Class clz, Throwable t) {
Error error = new Error();
error.clz = clz;
error.t = t;
return error;
}
}
SerializedSubject<Object, Object> serializedSubject = new SerializedSubject<>(PublishSubject.create());
static EventBus singleton = null;
EventBus() {
}
public static EventBus get() {
if (singleton == null) {
synchronized (EventBus.class) {
if (singleton == null) {
singleton = new EventBus();
}
}
}
return singleton;
}
/**
* Send events.
*/
public <T> void send(T t) {
serializedSubject.onNext(t);
}
public void complete(Class clz) {
serializedSubject.onNext(Completable.create(clz));
}
public void error(Class clz, Throwable throwable) {
serializedSubject.onNext(Error.create(clz, throwable));
}
/**
* Observe events sent.
*/
public <T> Observable<T> observable(final Class<T> cls) {
final Subscription[] holder = new Subscription[1];
Observable<T> obs = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
Subscription subscription = serializedSubject.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@SuppressWarnings("unchecked")
@Override
public void onNext(Object o) {
if (o instanceof Completable) {
if (cls.isAssignableFrom(((Completable) o).clz)) {
subscriber.onCompleted();
}
} else if (o instanceof Error) {
if (cls.isAssignableFrom(((Error) o).clz)) {
subscriber.onError(((Error) o).t);
}
} else if (cls.isAssignableFrom(o.getClass())) {
subscriber.onNext((T) o);
}
}
});
holder[0] = subscription;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (holder[0] != null && !holder[0].isUnsubscribed())
holder[0].unsubscribe();
}
});
return obs;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment