Skip to content

Instantly share code, notes, and snippets.

@sindrenm
Created December 14, 2016 12:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sindrenm/c12c91e753cd0cf7ec8d54792974a6a4 to your computer and use it in GitHub Desktop.
Save sindrenm/c12c91e753cd0cf7ec8d54792974a6a4 to your computer and use it in GitHub Desktop.
An event bus written in Kotlin using RxJava
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import rx.subjects.SerializedSubject
import rx.subscriptions.CompositeSubscription
object EventBus {
val bus = SerializedSubject(PublishSubject.create<Any>())
private val subscriptions = mutableMapOf<Any, CompositeSubscription>()
fun send(event: Any) {
bus.onNext(event)
}
inline fun <reified T : Any> observe(): Observable<T> = bus.ofType(T::class.java)
fun unregister(subscriber: Any) {
val compositeSubscription = subscriptions[subscriber] ?: return
compositeSubscription.clear()
subscriptions.remove(subscriber)
}
fun register(subscriber: Any, subscription: Subscription) {
val compositeSubscription = subscriptions[subscriber] ?: CompositeSubscription()
compositeSubscription.add(subscription)
subscriptions[subscriber] = compositeSubscription
}
}
fun Subscription.register(subscriber: Any) {
NetworkBus.register(subscriber, this)
}
@IgorGanapolsky
Copy link

Is this RxJava 1 or 2?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment