Skip to content

Instantly share code, notes, and snippets.

@odbol
Created August 3, 2021 20:38
Embed
What would you like to do?
Utilities for converting Firebase results to RxJava Observables
package com.odbol.rx
import com.google.android.gms.tasks.Task
import com.google.firebase.firestore.DocumentReference
import com.google.firebase.firestore.DocumentSnapshot
import com.google.firebase.firestore.ListenerRegistration
import io.reactivex.Emitter
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.SingleEmitter
/**
* Listens for updates from Firebase and emits them from an observable.
*
* Warning: null snapshots are purposefully ignored and will not be propagated down the stream.
*/
fun snapshotToObservable(ref: DocumentReference): Observable<DocumentSnapshot> {
lateinit var snapshotListener: ListenerRegistration
return Observable.create { emitter: Emitter<DocumentSnapshot> ->
snapshotListener = ref.addSnapshotListener { value, e ->
if (e != null) emitter.onError(e)
if (value != null) emitter.onNext(value)
}
}.doOnDispose {
snapshotListener.remove()
}
}
/**
* Listens to a Task from Firebase and emits the result.
*
* Warning: null snapshots are purposefully ignored and will not be propagated down the stream.
*/
fun <T> taskToObservable(task: Task<T>): Single<T> {
return Single.create { emitter: SingleEmitter<T> ->
task.addOnCompleteListener {
val exception = it.exception
when {
it.isSuccessful && it.result != null -> {
emitter.onSuccess(it.result!!)
}
exception != null -> {
emitter.onError(exception)
}
else -> {
emitter.onError(Exception("Task canceled"))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment