Skip to content

Instantly share code, notes, and snippets.

@krebernisak
Forked from gsoltis/RxFirebase.java
Last active November 23, 2015 11:05
Show Gist options
  • Save krebernisak/ba50705fbf919972fa44 to your computer and use it in GitHub Desktop.
Save krebernisak/ba50705fbf919972fa44 to your computer and use it in GitHub Desktop.
RxJava Bindings for Firebase
package com.firebase.client;
import com.firebase.client.AuthData;
import com.firebase.client.ChildEventListener;
import com.firebase.client.DataSnapshot;
import com.firebase.client.Firebase;
import com.firebase.client.FirebaseError;
import com.firebase.client.Query;
import com.firebase.client.ValueEventListener;
import com.firebase.client.core.view.Event;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;
import static com.firebase.client.core.view.Event.EventType.CHILD_ADDED;
import static com.firebase.client.core.view.Event.EventType.CHILD_CHANGED;
import static com.firebase.client.core.view.Event.EventType.CHILD_MOVED;
import static com.firebase.client.core.view.Event.EventType.CHILD_REMOVED;
public class RxFirebase {
/**
* Essentially a struct so that we can pass all children events through as a single object.
*/
public static class FirebaseChildEvent {
public final DataSnapshot snapshot;
public final Event.EventType eventType;
public final String prevName;
public FirebaseChildEvent(DataSnapshot snapshot, Event.EventType eventType, String prevName) {
this.snapshot = snapshot;
this.eventType = eventType;
this.prevName = prevName;
}
}
public static class RxFirebaseException extends RuntimeException {
public final FirebaseError error;
private RxFirebaseException(FirebaseError error) {
super(error.getMessage(), error.toException());
this.error = error;
}
public static RxFirebaseException from(FirebaseError error) {
return new RxFirebaseException(error);
}
}
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) {
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() {
@Override
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) {
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_ADDED, prevName));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_CHANGED, prevName));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_REMOVED, null));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_MOVED, prevName));
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(RxFirebaseException.from(error));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Event.EventType eventType) {
return new Func1<FirebaseChildEvent, Boolean>() {
@Override
public Boolean call(FirebaseChildEvent firebaseChildEvent) {
return firebaseChildEvent.eventType == eventType;
}
};
}
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) {
return observeChildren(ref).filter(makeEventFilter(CHILD_ADDED));
}
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) {
return observeChildren(ref).filter(makeEventFilter(CHILD_CHANGED));
}
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(CHILD_MOVED));
}
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(CHILD_REMOVED));
}
public static Observable<DataSnapshot> observe(final Query ref) {
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
@Override
public void call(final Subscriber<? super DataSnapshot> subscriber) {
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
subscriber.onNext(dataSnapshot);
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(RxFirebaseException.from(error));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
public static Observable<DataSnapshot> once(final Query ref) {
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
@Override
public void call(final Subscriber<? super DataSnapshot> subscriber) {
ref.addListenerForSingleValueEvent(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
subscriber.onNext(dataSnapshot);
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(RxFirebaseException.from(error));
}
});
}
});
}
public static Observable<Firebase> setValue(final Firebase ref, final Object value) {
return Observable.create(new Observable.OnSubscribe<Firebase>() {
@Override
public void call(final Subscriber<? super Firebase> subscriber) {
ref.setValue(value, new Firebase.CompletionListener() {
@Override
public void onComplete(FirebaseError error, Firebase firebase) {
if (error != null) {
subscriber.onError(RxFirebaseException.from(error));
return;
}
subscriber.onNext(firebase);
subscriber.onCompleted();
}
});
}
});
}
public static Observable<AuthData> observeAuthState(final Firebase ref) {
return Observable.create(new Observable.OnSubscribe<AuthData>() {
@Override
public void call(final Subscriber<? super AuthData> subscriber) {
final Firebase.AuthStateListener listener = ref.addAuthStateListener(new Firebase.AuthStateListener() {
@Override
public void onAuthStateChanged (AuthData authData) {
subscriber.onNext(authData);
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeAuthStateListener(listener);
}
}));
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment