Skip to content

Instantly share code, notes, and snippets.

@renanferrari
Created November 29, 2016 12:41
Show Gist options
  • Save renanferrari/0a322ffea15f9a5a743848034d90f7a1 to your computer and use it in GitHub Desktop.
Save renanferrari/0a322ffea15f9a5a743848034d90f7a1 to your computer and use it in GitHub Desktop.
RxFirebaseUtils
public class RxFirebaseUtils {
public static <T> Observable<T> observeTask(final Task<T> task) {
return Observable.<T>create(subscriber -> RxHandler.assignOnTask(subscriber, task)).observeOn(
Schedulers.io());
}
public static Observable<List<DataSnapshot>> join(final DatabaseReference keysRef,
final DatabaseReference objectsRef) {
return RxFirebaseDatabase.observeValueEvent(keysRef)
.observeOn(Schedulers.io())
.map(DataSnapshot::getChildren)
.flatMap(keySnapshots -> Observable.from(keySnapshots)
.observeOn(Schedulers.io())
.map(DataSnapshot::getKey)
.map(key -> RxFirebaseDatabase.observeValueEvent(objectsRef.child(key)))
.toList()
.flatMap(observables -> Observable.combineLatest(observables, combiner())));
}
private static FuncN<List<DataSnapshot>> combiner() {
return array -> Arrays.asList(Arrays.copyOf(array, array.length, DataSnapshot[].class));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment