Skip to content

Instantly share code, notes, and snippets.

@gsoltis
Last active May 6, 2024 03:16
Show Gist options
  • Save gsoltis/86210e3259dcc6998801 to your computer and use it in GitHub Desktop.
Save gsoltis/86210e3259dcc6998801 to your computer and use it in GitHub Desktop.
RxJava Bindings for Firebase
package com.firebase.client;
import com.firebase.client.core.Constants;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;
public class RxFirebase {
/**
* Essentially a struct so that we can pass all children events through as a single object.
*/
public static class FirebaseChildEvent {
public DataSnapshot snapshot;
public Constants.EventType eventType;
public String prevName;
public FirebaseChildEvent(DataSnapshot snapshot, Constants.EventType eventType, String prevName) {
this.snapshot = snapshot;
this.eventType = eventType;
this.prevName = prevName;
}
}
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) {
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() {
@Override
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) {
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_ADDED, prevName));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_CHANGED, prevName));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_REMOVED, null));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_MOVED, prevName));
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(new FirebaseException(error.getMessage()));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Constants.EventType eventType) {
return new Func1<FirebaseChildEvent, Boolean>() {
@Override
public Boolean call(FirebaseChildEvent firebaseChildEvent) {
return firebaseChildEvent.eventType == eventType;
}
};
}
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_ADDED));
}
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_CHANGED));
}
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_MOVED));
}
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_REMOVED));
}
public static Observable<DataSnapshot> observe(final Query ref) {
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
@Override
public void call(final Subscriber<? super DataSnapshot> subscriber) {
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
subscriber.onNext(dataSnapshot);
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(new FirebaseException(error.getMessage()));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
}
@ryanjohn1
Copy link

This looks really useful!
I was wondering if you could add some kind of licence if you are happy for other people to use this in their work?

@xAIdrian
Copy link

Where are you getting Constants.EventType from? Is there someplace I could get access to those variables?

@ojacquemart
Copy link

For anyone who should be interested, here is the missing enum declaration:

enum EventType {
    CHILD_ADDED, CHILD_CHANGED, CHILD_REMOVED, CHILD_MOVED
}

@TinasheMzondiwa
Copy link

If anyone is interested I have forked this and made my own adjustments here: https://gist.github.com/TinasheMzondiwa/3b330c72a5b6625d66c2e1b63abaac9b

@putt
Copy link

putt commented Oct 23, 2016

Will "new Observable.OnSubscribe" block cause some memory leak?
After call the .subscribe(), I can unsubscribe subscription in onDestory(). Then what happened to the running "new Observable.OnSubscribe" block ?

@hendisantika
Copy link

Hi, nice share @gssoltis.
But, how to do it in Java? It is android way.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment