Skip to content

Instantly share code, notes, and snippets.

@AAverin
Last active April 17, 2020 11:09
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save AAverin/e58f4fee0ad4f9393f9825ca3e32fb04 to your computer and use it in GitHub Desktop.
Save AAverin/e58f4fee0ad4f9393f9825ca3e32fb04 to your computer and use it in GitHub Desktop.
Kotlin Rx Extensions
@Module
class AppModule() {
//...
@Provides
@Singleton
fun getSchedulers(looperThread: LooperThread): Schedulers {
return RxSchedulers(looperThread)
}
@Provides
@Singleton
fun getLooperThread(): LooperThread {
val looperThread = LooperThread()
looperThread.start()
return looperThread
}
//...
}
import rx.Observable
import rx.Observer
import rx.Subscriber
import rx.Subscription
import rx.functions.Action0
import rx.functions.Action1
fun <T> Observable<T>.uiSubscribe(schedulers: Schedulers, subscriber: Subscriber<in T>): Subscription {
return subscribeOn(schedulers.io)
.observeOn(schedulers.mainThread)
.subscribe(subscriber)
}
fun <T> Observable<T>.uiSubscribe(schedulers: Schedulers, observer: Observer<in T>): Subscription {
return subscribeOn(schedulers.io)
.observeOn(schedulers.mainThread)
.subscribe(observer)
}
fun <T> Observable<T>.uiSubscribe(schedulers: Schedulers,
onNext: Action1<in T>,
onError: Action1<Throwable>? = null,
onComplete: Action0? = null): Subscription {
return subscribeOn(schedulers.io)
.observeOn(schedulers.mainThread)
.subscribe(onNext, onError, onComplete)
}
fun <T> Observable<T>.uiSubscribe(schedulers: Schedulers, onNext: (T) -> Unit): Subscription {
return uiSubscribe(schedulers, onNext, null, null)
}
fun <T> Observable<T>.uiSubscribe(schedulers: Schedulers,
onNext: (T) -> Unit,
onError: ((Throwable) -> Unit)? = null,
onComplete: (() -> Unit)? = null): Subscription {
return subscribeOn(schedulers.io)
.observeOn(schedulers.mainThread)
.subscribe({ next ->
onNext(next)
}, { throwable ->
onError?.apply {
this(throwable)
}
}, {
onComplete?.apply {
this()
}
})
}
import android.os.Looper
class LooperThread : Thread() {
private lateinit var looper: Looper
fun getLooper(): Looper {
return looper
}
override fun run() {
Looper.prepare()
looper = Looper.myLooper()
Looper.loop()
}
}
import rx.Scheduler
class MockSchedulers : Schedulers {
override val io: Scheduler
get() = rx.schedulers.Schedulers.immediate()
override val loopedIo: Scheduler
get() = rx.schedulers.Schedulers.immediate()
override val mainThread: Scheduler
get() = rx.schedulers.Schedulers.immediate()
override val computation: Scheduler
get() = rx.schedulers.Schedulers.immediate()
}
import android.os.Handler
import right.apps.photo.map.data.common.LooperThread
import rx.Scheduler
import rx.android.schedulers.AndroidSchedulers
import rx.android.schedulers.HandlerScheduler
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class RxSchedulers @Inject constructor(val looperThread: LooperThread) : Schedulers {
override val io: Scheduler
get() = rx.schedulers.Schedulers.io()
override val loopedIo: Scheduler
get() = HandlerScheduler.from(Handler(looperThread.getLooper()))
override val mainThread: Scheduler
get() = AndroidSchedulers.mainThread()
override val computation: Scheduler
get() = rx.schedulers.Schedulers.computation()
}
import rx.Scheduler
interface Schedulers {
val io: Scheduler
val loopedIo: Scheduler
val mainThread: Scheduler
val computation: Scheduler
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment