Skip to content

Instantly share code, notes, and snippets.

@odbol
Created August 3, 2021 20:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save odbol/382aab7c33c1f7e17f83a01bb7451b96 to your computer and use it in GitHub Desktop.
Save odbol/382aab7c33c1f7e17f83a01bb7451b96 to your computer and use it in GitHub Desktop.
Utilities for converting Firebase results to RxJava Observables
package com.odbol.rx
import com.google.android.gms.tasks.Task
import com.google.firebase.firestore.DocumentReference
import com.google.firebase.firestore.DocumentSnapshot
import com.google.firebase.firestore.ListenerRegistration
import io.reactivex.Emitter
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.SingleEmitter
/**
* Listens for updates from Firebase and emits them from an observable.
*
* Warning: null snapshots are purposefully ignored and will not be propagated down the stream.
*/
fun snapshotToObservable(ref: DocumentReference): Observable<DocumentSnapshot> {
lateinit var snapshotListener: ListenerRegistration
return Observable.create { emitter: Emitter<DocumentSnapshot> ->
snapshotListener = ref.addSnapshotListener { value, e ->
if (e != null) emitter.onError(e)
if (value != null) emitter.onNext(value)
}
}.doOnDispose {
snapshotListener.remove()
}
}
/**
* Listens to a Task from Firebase and emits the result.
*
* Warning: null snapshots are purposefully ignored and will not be propagated down the stream.
*/
fun <T> taskToObservable(task: Task<T>): Single<T> {
return Single.create { emitter: SingleEmitter<T> ->
task.addOnCompleteListener {
val exception = it.exception
when {
it.isSuccessful && it.result != null -> {
emitter.onSuccess(it.result!!)
}
exception != null -> {
emitter.onError(exception)
}
else -> {
emitter.onError(Exception("Task canceled"))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment