Skip to content

Instantly share code, notes, and snippets.

@jaredsburrows
Last active March 16, 2023 10:44
Show Gist options
  • Star 31 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save jaredsburrows/e9706bd8c7d587ea0c1114a0d7651d13 to your computer and use it in GitHub Desktop.
Save jaredsburrows/e9706bd8c7d587ea0c1114a0d7651d13 to your computer and use it in GitHub Desktop.
RxBus for RxJava 1 and RxJava 2
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* @author <a href="mailto:jaredsburrows@gmail.com">Jared Burrows</a>
*/
public final class RxBus {
private final Subject<Object, Object> bus = new SerializedSubject<>(PublishSubject.create());
public void send(final Object event) {
bus.onNext(event);
}
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}
import com.jakewharton.rxrelay.PublishRelay;
import com.jakewharton.rxrelay.Relay;
import rx.Observable;
/**
* @author <a href="mailto:jaredsburrows@gmail.com">Jared Burrows</a>
*/
public final class RxBus {
private final Relay<Object, Object> bus = PublishRelay.create().toSerialized();
public void send(final Object event) {
bus.call(event);
}
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
/**
* @author <a href="mailto:jaredsburrows@gmail.com">Jared Burrows</a>
*/
public final class RxBus {
private final PublishSubject<Object> bus = PublishSubject.create();
public void send(final Object event) {
bus.onNext(event);
}
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
/**
* @author <a href="mailto:jaredsburrows@gmail.com">Jared Burrows</a>
*/
public final class RxBus {
private final Relay<Object> bus = PublishRelay.create().toSerialized();
public void send(Object event) {
bus.accept(event);
}
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}
@PaulWoitaschek
Copy link

For my own bus I mostly subscribe to one specific event as that turned out to make clearer separation:

public <T> Observable<T> event(Class<T> klazz) {
    return mBus.ofType(klazz);
}

@igoticecream
Copy link

I think is better to use RxRelay from Jake Wharton... because if onError occurs, the bus is dead.

@jaredsburrows
Copy link
Author

@PaulWoitaschek That is great for filtering events to listen for.

@igoticecream Yeah. I'll update the gists with https://github.com/JakeWharton/RxRelay.

@XWC95
Copy link

XWC95 commented Nov 29, 2016

A activity send msg , B activity can receive msg?

@jaredsburrows
Copy link
Author

@xwc520 You can do that with the RxBus.

@herrbert74
Copy link

How do you unsubscribe?

@OleksandrKucherenko
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment