Skip to content

Instantly share code, notes, and snippets.

// don't start emitting items just yet by turning the observable to a connected one
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
tapEventEmitter.publish((Func1) (stream) -> {
// inside `publish`, "stream" is truly multicasted
// applying the same technique for getting a debounced buffer sequence
return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
tapEventEmitter.buffer(debouncedEventEmitter)
Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
// which is really the same as:
Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();
Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
Observable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEventEmitter);
debouncedBufferEmitter.buffer(debouncedEventEmitter)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Object>>() {
@Override
public void call(List<Object> taps) {
_showTapCount(taps.size());
@kaushikgopal
kaushikgopal / implement_bus_3.java
Created June 26, 2016 05:37
Blog post snippet for implementing event bus
// note that it is important to subscribe to the exact same _rxBus instance that was used to post the events
_rxBus.toObserverable()
.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {
if(event instanceof TapEvent) {
_showTapText();
}else if(event instanceof SomeOtherEvent) {
@kaushikgopal
kaushikgopal / implement_bus_2.java
Created June 26, 2016 05:37
Blog post snippet for implementing event bus
@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {
_rxBus.send(new TapEvent());
}
@kaushikgopal
kaushikgopal / implement_bus_1.java
Created June 26, 2016 05:36
Blog post snippet for implementing event bus
// this is the middleman object
public class RxBus {
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
@kaushikgopal
kaushikgopal / threading_3.java
Created June 25, 2016 21:45
Blog post snippet
// the below code is inside a TextWatcher
// which implements the onTextChanged method
// I've simplified it to only highlight the parts we're
// interested in
private long lastChange = 0;
@Override
public void onTextChanged(final CharSequence chars,
@kaushikgopal
kaushikgopal / threading_2.java
Created June 25, 2016 21:44
Blog post snippet
public class TestActivity extends Activity {
// ...
// all standard stuff
@Override
public void onCreate(Bundle savedInstanceState) {
// ...
// all standard stuff
@kaushikgopal
kaushikgopal / threading_1.java
Created June 25, 2016 21:39
Blog post snippet primer on threading
// Version 1
public class IAmAThread extends Thread {
public IAmAThread() {
super("IAmAThread");
}
@Override
public void run() {
// your code (sequence of instructions)