Skip to content

Instantly share code, notes, and snippets.

@AliYusuf95
Last active May 7, 2017 06:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AliYusuf95/d7bc5e2f0b1f9871b6d773d2fd884aad to your computer and use it in GitHub Desktop.
Save AliYusuf95/d7bc5e2f0b1f9871b6d773d2fd884aad to your computer and use it in GitHub Desktop.
RxJava event bus implementation with subscribing and unsubscribing.
import android.support.annotation.NonNull;
import java.util.HashMap;
import java.util.Map;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.PublishSubject;
/**
* Used for subscribing to and publishing to subjects. Allowing you to send data between activities, fragments, etc.
*/
public final class RxBus {
private static final String TAG = RxBus.class.getSimpleName();
private static PublishSubject<Object> bus;
private static Map<Object, CompositeDisposable> sSubscriptionsMap = new HashMap<>();
private RxBus() {
// Empty constructor
}
/**
* Get bus object or create it if it's not already in memory.
*/
private static PublishSubject<Object> getBus(){
if (bus == null){
bus = PublishSubject.create();
bus.subscribeOn(AndroidSchedulers.mainThread());
}
return bus;
}
/**
* Get the CompositeDisposable or create it if it's not already in memory.
*/
@NonNull
private static CompositeDisposable getCompositeSubscription(@NonNull Object object) {
CompositeDisposable compositeSubscription = sSubscriptionsMap.get(object);
if (compositeSubscription == null) {
compositeSubscription = new CompositeDisposable();
sSubscriptionsMap.put(object, compositeSubscription);
}
return compositeSubscription;
}
/**
* Subscribe to the specified event and listen for updates on that event. Pass in an event class
* to associate your registration with, so that you can unsubscribe later.
*
* Note: Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks.
*/
public static <T> Disposable subscribe(@NonNull Object lifecycle, @NonNull final Class<T> eventClass, @NonNull Consumer<T> action) {
Disposable subscription = getBus()
.filter(new Predicate<Object>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception {
return o != null; // Filter out null objects, better safe than sorry
}
})
.filter(new Predicate<Object>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception {
return eventClass.isInstance(o); // check if same event class
}
})
.cast(eventClass) // Cast it for easier usage
.subscribe(action);
getCompositeSubscription(lifecycle).add(subscription);
return subscription;
}
/**
* Get {@link Observable} to a specified event class.
*
* Note: If you make a subscription to the observable make sure to call {@link Disposable#dispose()}
* to avoid memory leaks.
*/
public static <T> Observable<T> observable(@NonNull final Class<T> eventClass) {
return getBus()
.filter(new Predicate<Object>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception {
return o != null; // Filter out null objects, better safe than sorry
}
})
.filter(new Predicate<Object>() {
@Override
public boolean test(@NonNull Object o) throws Exception {
return eventClass.isInstance(o); // check if same event class
}
})
.cast(eventClass); // Cast it for easier usage
}
/**
* Unregisters this object from the bus, removing all subscriptions.
* This should be called when the object is going to go out of memory.
*/
public static void unregister(@NonNull Object lifecycle) {
//We have to remove the composition from the map, because once you unsubscribe it can't be used anymore
CompositeDisposable compositeSubscription = sSubscriptionsMap.remove(lifecycle);
if (compositeSubscription != null) {
compositeSubscription.dispose();
}
}
/**
* Post an event for all subscribers of that event.
*/
public static <T> void post(@NonNull T event) {
if (getBus().hasObservers()){
getBus().onNext(event);
}
}
}
/**
* Example usage
*/
public class RxBusActivity extends AppCompatActivity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Subscribe this activity in Event
RxBus.subscribe(this, Event.class, new Consumer<Event>() {
@Override
public void accept(Event event) throws Exception {
}
});
// Post an event in the bus
RxBus.post(new Event());
}
@Override
protected void onDestroy() {
super.onDestroy();
// Unsubscribe this activity to avoid memory leak.
RxBus.unsubscribe(this);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment