Skip to content

Instantly share code, notes, and snippets.

@magillus
Created September 14, 2017 22:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save magillus/c0795671c10d191001d1f61d9db66ec5 to your computer and use it in GitHub Desktop.
Save magillus/c0795671c10d191001d1f61d9db66ec5 to your computer and use it in GitHub Desktop.
RxRealm Flowable wrappers with Looper
package com.example.playground;
import android.os.HandlerThread
import android.os.Process
/**
* Looper based BackgroundThread handler for REalm executions.
*/
class BackgroundThread : HandlerThread("Scheduler-Realm-BackgroundThread", Process.THREAD_PRIORITY_BACKGROUND)
package com.example.playground;
import android.os.Handler
import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.FlowableOnSubscribe
import io.reactivex.disposables.Disposables
import io.realm.*
/**
* Rx wrapper for Realm object changes.
* Wraps all Realm actions in ThreadHandler with Looper
*
* Copyright 2017 Mateusz Perlak - http://www.apache.org/licenses/LICENSE-2.0
*/
class RxRealmObjectFlowable<T : RealmObject>(
private val fetchManageObject: (Realm) -> T?,
private val changeProperties: List<String>? = null,
private val realmConfig: RealmConfiguration?) : FlowableOnSubscribe<T> {
var backgroundThread: BackgroundThread? = null
private var handler = prepareHandler()
fun prepareHandler(): Handler {
if (backgroundThread == null) {
backgroundThread = BackgroundThread()
backgroundThread!!.start()
}
return Handler(backgroundThread!!.looper)
}
companion object {
fun <T : RealmObject> create(fetchManageObject: (Realm) -> T?, realmConfig: RealmConfiguration? = null): Flowable<T> {
return Flowable.create(RxRealmObjectFlowable(fetchManageObject, null, realmConfig), BackpressureStrategy.LATEST)
}
fun <T : RealmObject> create(fetchManageObject: (Realm) -> T?, changeProperties: List<String>?, realmConfig: RealmConfiguration? = null): Flowable<T> {
return Flowable.create(RxRealmObjectFlowable(fetchManageObject, changeProperties, realmConfig), BackpressureStrategy.LATEST)
}
}
override fun subscribe(e: FlowableEmitter<T>?) {
e?.let {
handler.post {
var realm = Realm.getInstance(realmConfig)
val changeObservable = RealmObjectChangeListener<T> { t, changeSet ->
var gotValidChange = true
changeProperties?.let {
gotValidChange = false
it.forEach { trackChangeProperty ->
gotValidChange = gotValidChange or changeSet.isFieldChanged(trackChangeProperty)
}
}
if (gotValidChange) {
if (realm != null) {
e.onNext(realm.copyFromRealm(t))
} else {
e.onNext(t)
}
}
}
val changeObject = fetchManageObject(realm)
if (changeObject == null) {
e.onComplete()
} else {
changeObject.addChangeListener(changeObservable)
e.onNext(realm.copyFromRealm(changeObject))
e.setDisposable(Disposables.fromRunnable({
handler.post {
changeObject.removeChangeListener(changeObservable)
realm.close()
}
}))
}
}
}
}
}
package com.example.playground;
import android.os.Handler
import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.FlowableOnSubscribe
import io.reactivex.disposables.Disposables
import io.realm.*
/**
* Rx wrapper for Realm query results changes.
* All Realm actions happen on same Looper
* Copyright 2017 Mateusz Perlak - http://www.apache.org/licenses/LICENSE-2.0
*/
class RxRealmQueryFlowable<T : RealmObject>(val fetchQuery: (Realm) -> RealmQuery<T>, val realmConfiguration: RealmConfiguration?) : FlowableOnSubscribe<List<T>> {
companion object {
fun <T : RealmObject> create(fetchQuery: (Realm) -> RealmQuery<T>, realmConfig: RealmConfiguration?): Flowable<List<T>> {
return Flowable.create(RxRealmQueryFlowable(fetchQuery, realmConfig), BackpressureStrategy.LATEST)
}
}
var backgroundThread: BackgroundThread? = null
private var handler = prepareHandler()
fun prepareHandler(): Handler {
if (backgroundThread == null) {
backgroundThread = BackgroundThread()
backgroundThread!!.start()
}
return Handler(backgroundThread!!.looper)
}
override fun subscribe(emitter: FlowableEmitter<List<T>>?) {
emitter?.let { emitter ->
handler.post {
var realm = if (realmConfiguration == null) Realm.getDefaultInstance() else Realm.getInstance(realmConfiguration)
val queryObserver = RealmChangeListener<RealmResults<T>> { list ->
if (realm != null) {
emitter.onNext(realm.copyFromRealm(list))
} else {
emitter.onNext(list)
}
}
var realmResult = fetchQuery.invoke(realm).findAll()
realmResult.addChangeListener(queryObserver)
emitter.onNext(realm.copyFromRealm(realmResult))
emitter.setDisposable(Disposables.fromRunnable({
handler.post {
realmResult.removeChangeListener(queryObserver)
realm.close()
}
}))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment