Skip to content

Instantly share code, notes, and snippets.

@vuhung3990
Created February 2, 2016 02:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vuhung3990/d1143fb3b05c2cef3112 to your computer and use it in GitHub Desktop.
Save vuhung3990/d1143fb3b05c2cef3112 to your computer and use it in GitHub Desktop.
RxBus replace of eventbus
// onCreate
sub = RxBus.getInstance().subscribe(Integer.class, new Subscriber<Object>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "onNext: " + o);
}
});
// onDestroy
RxBus.getInstance().unSubscribe(sub);
package com.example.hungvu.testpack;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
/**
* RxBus to replace event bus
* <pre>
* Example:
* onCreate()
* subscription = RxBus.getInstance().subscribe...
*
* onDestroy()
* RxBus.getInstance().unSubscribe(subscription);
* </pre>
*/
public class RxBus {
private static RxBus instance;
private final Subject<Object, Object> _bus = PublishSubject.create();
public static RxBus getInstance() {
if (instance == null) {
instance = new RxBus();
}
return instance;
}
/**
* send event object
*
* @param o object to send (string, int, object ...)
*/
public void send(Object o) {
_bus.onNext(o);
}
/**
* @return The Observable class that implements the Reactive Pattern.
* This class provides methods for subscribing to the Observable as well as delegate methods to the various Observers
*/
public Observable<Object> toObserverable() {
return _bus;
}
/**
* @param filter type of return object, null to skip
* @param subcriber Provides a mechanism for receiving push-based notifications from Observables, and permits manual un-subscribing from these Observables
* @return Subscription returns from Observable.subscribe(Subscriber) to allow un-subscribing
* @see #unSubscribe(Subscription)
*/
public Subscription subscribe(final Class filter, Subscriber<? super Object> subcriber) {
return _bus.filter(new Func1<Object, Boolean>() {
@Override
public Boolean call(Object o) {
return filter != null ? filter.isInstance(o) : true;
}
}).subscribe(subcriber);
}
/**
* un-subscribe to avoid leak memory
*
* @param subscription Subscription returns from Observable.subscribe(Subscriber) to allow un-subscribing
* @see #subscribe(Class, Subscriber)
*/
public void unSubscribe(Subscription subscription) {
if (subscription != null && subscription.isUnsubscribed())
subscription.unsubscribe();
}
}
@rskumar
Copy link

rskumar commented Feb 5, 2016

Thanks, I was just searching for one with unSubscribe() to avoid memory leak. For fragment, where should I subscribe and unsubscribe?

@vuhung3990
Copy link
Author

onResume -> subscribe, onPause -> unsubscribe, you can check it out here: https://github.com/googlesamples/android-architecture/tree/todo-mvp-rxjava/

@Fgabz
Copy link

Fgabz commented Mar 22, 2017

@vuhung3990 I think your unSubscribe method is missing something : if (subscription != null && !subscription.isUnsubscribed()) {} Otherwise it won't actually unsubscribe the observer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment