Skip to content

Instantly share code, notes, and snippets.

@imran0101
Last active May 31, 2017 06:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save imran0101/e6d213af4dedb48efa36bc64ce18f903 to your computer and use it in GitHub Desktop.
Save imran0101/e6d213af4dedb48efa36bc64ce18f903 to your computer and use it in GitHub Desktop.
EventBus using RxJava. Bunch of subjects and observers.
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
/**
* An object reference of EventBus
* courtesy https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
*
* @hide
*/
public class EventBus {
SerializedSubject<Object, Object> serializedSubject = new SerializedSubject<>(PublishSubject.create());
static EventBus singleton = null;
EventBus() {
}
public static EventBus get() {
if (singleton == null) {
synchronized (EventBus.class) {
if (singleton == null) {
singleton = new EventBus();
}
}
}
return singleton;
}
/**
* Send events.
*/
public <T> void send(T t) {
serializedSubject.onNext(t);
}
/**
* Observe events sent.
*/
public <T> Observable<T> observable(final Class<T> cls) {
final Subscription[] holder = new Subscription[1];
Observable<T> obs = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
Subscription subscription = serializedSubject.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@SuppressWarnings("unchecked")
@Override
public void onNext(Object o) {
if (cls.equals(o.getClass())) {
subscriber.onNext((T) o);
}
}
});
holder[0] = subscription;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (holder[0] != null && !holder[0].isUnsubscribed())
holder[0].unsubscribe();
}
});
return obs;
}
}
import rx.Subscription;
import rx.functions.Action1;
/**
* Sample EventBus
*/
public class Sample {
static class Event {
int data;
public Event(int data) {
this.data = data;
}
}
static class AnotherEvent {
int data;
public AnotherEvent(int data) {
this.data = data;
}
}
static class ExtendedEvent extends Event {
public ExtendedEvent(int data) {
super(data);
}
}
public static void main(String[] args) {
//subscribe to receive events
Subscription subscription1 = EventBus
.get()
.observable(Event.class)
.subscribe(new Action1<Event>() {
@Override
public void call(Event myEvent) {
System.out.println("Event : " + myEvent.data);
}
});
Subscription subscription2 = EventBus.get()
.observable(AnotherEvent.class)
.subscribe(new Action1<AnotherEvent>() {
@Override
public void call(AnotherEvent anotherEvent) {
System.out.println("AnotherEvent : " + anotherEvent.data);
}
});
Subscription subscription3 = EventBus.get()
.observable(ExtendedEvent.class)
.subscribe(new Action1<ExtendedEvent>() {
@Override
public void call(ExtendedEvent extendedEvent) {
System.out.println("ExtendedEvent : " + extendedEvent.data);
}
});
//send events
for (int i = 1; i < 10; i++) {
EventBus.get().send(new Event(i));
EventBus.get().send(new AnotherEvent(i));
EventBus.get().send(new ExtendedEvent(i));
}
unsubscribe(subscription1);
unsubscribe(subscription2);
unsubscribe(subscription3);
}
static void unsubscribe(Subscription subscription) {
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
}
@imran0101
Copy link
Author

imran0101 commented May 21, 2016

With help from @madhu314

Program output.

Event : 1
AnotherEvent : 1
ExtendedEvent : 1
Event : 2
AnotherEvent : 2
ExtendedEvent : 2
Event : 3
AnotherEvent : 3
ExtendedEvent : 3
Event : 4
AnotherEvent : 4
ExtendedEvent : 4
Event : 5
AnotherEvent : 5
ExtendedEvent : 5
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment