Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
RxBus with Sticky Events
/**
* An RxJava-backed EventBus class that can support sending and receiving multiple event types.
*
* Based on https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
*/
public class EventBus<T> {
private static EventBus<Object> INSTANCE;
private List<T> events;
private Subject<T, T> publishSubject =
new SerializedSubject<>(PublishSubject.<T>create());
public static EventBus getInstance() {
if (INSTANCE == null) {
INSTANCE = new EventBus<>();
}
return INSTANCE;
}
public EventBus() {
events = new ArrayList<>();
}
/**
* Sends an event that will only be consumed by real-time subscribers
* @param update
* @param <E>
*/
public <E extends T> void send(E update) {
publishSubject.onNext(update);
}
/**
* Sends an event that will be stored until consumed. For example, if a class
* subscribes to this event long after it is fired, it can still consume this event
* by calling asObservable().
* @param update
* @param <E>
*/
public <E extends T> void sendSticky(E update) {
events.add(update);
publishSubject.onNext(update);
}
public <E extends T> void removeSticky(E update) {
events.remove(update);
}
public <E extends T> Observable<E> asObservable(Class<E> eventClass) {
return Observable.merge(asHotObservable(eventClass), asColdObservable(eventClass));
}
private <E extends T> Observable<E> asHotObservable(Class<E> eventClass) {
return publishSubject.ofType(eventClass);
}
private <E extends T> Observable<E> asColdObservable(Class<E> eventClass) {
return Observable.from(new ArrayList<>(events))
.ofType(eventClass);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.