Skip to content

Instantly share code, notes, and snippets.

@k2wanko
Last active May 29, 2017 15:15
Show Gist options
  • Save k2wanko/ba5fd3e5cabe742b251f to your computer and use it in GitHub Desktop.
Save k2wanko/ba5fd3e5cabe742b251f to your computer and use it in GitHub Desktop.
RxJava helper for Kotlin
package rx.kotlin
import rx.Observable
import rx.Subscriber
import rx.Subscription
public fun <T> Observable<T>.onError(block : (Throwable) -> Unit): KSubscription<T> {
return KSubscription(this).onError(block)
}
public fun <T> Observable<T>.onCompleted(block : () -> Unit): KSubscription<T> {
return KSubscription(this).onCompleted(block)
}
public fun <T> Observable<T>.onNext(block : (T) -> Unit): KSubscription<T> {
return KSubscription(this).onNext(block)
}
public fun Subscription.onError(block: (Throwable) -> Unit): Subscription {
return this
}
public class KSubscription<T>(val observable: Observable<T>) {
private var error: (Throwable) -> Unit = { throw it }
private var completed: () -> Unit = {}
private var next: (T) -> Unit = {}
fun onError(block: (Throwable) -> Unit): KSubscription<T> {
error = block
return this
}
fun onCompleted(block: () -> Unit): KSubscription<T> {
completed = block
return this
}
fun onNext(block: (T) -> Unit): KSubscription<T> {
next = block
return this
}
fun subscribe(): Subscription = observable.subscribe(object : Subscriber<T>(){
override fun onError(e: Throwable?) {
if ( e == null ) {
return
}
error.invoke(e)
}
override fun onCompleted() {
completed.invoke()
}
override fun onNext(t: T) {
next.invoke(t)
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment