Skip to content

Instantly share code, notes, and snippets.

@Orange168
Created April 10, 2016 08:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Orange168/81a3f2dc67bc2248b56b3697adcc601e to your computer and use it in GitHub Desktop.
Save Orange168/81a3f2dc67bc2248b56b3697adcc601e to your computer and use it in GitHub Desktop.
RxBusDemo
/**
* courtesy: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
*/
public class RxBus {
//private final PublishSubject<Object> _bus = PublishSubject.create();
// If multiple threads are going to emit events to this
// then it must be made thread-safe like this instead
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
public boolean hasObservers() {
return _bus.hasObservers();
}
}
@Override
public void onStart() {
super.onStart();
_subscriptions = new CompositeSubscription();
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
_subscriptions//
.add(tapEventEmitter.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
}
}));
// tapEventEmitter.flatMap()
_subscriptions//
.add(tapEventEmitter.publish(new Func1<Observable<Object>, Observable<List<Object>>>() {
@Override
public Observable<List<Object>> call(Observable<Object> stream) {
return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
@Override
public void call(List<Object> taps) {
_showTapCount(taps.size());
}
}));
_subscriptions.add(tapEventEmitter.connect());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment