Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
RxJava Bindings for Firebase
package com.firebase.client;
import com.firebase.client.core.Constants;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;
public class RxFirebase {
/**
* Essentially a struct so that we can pass all children events through as a single object.
*/
public static class FirebaseChildEvent {
public DataSnapshot snapshot;
public Constants.EventType eventType;
public String prevName;
public FirebaseChildEvent(DataSnapshot snapshot, Constants.EventType eventType, String prevName) {
this.snapshot = snapshot;
this.eventType = eventType;
this.prevName = prevName;
}
}
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) {
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() {
@Override
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) {
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_ADDED, prevName));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_CHANGED, prevName));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_REMOVED, null));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) {
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_MOVED, prevName));
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(new FirebaseException(error.getMessage()));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Constants.EventType eventType) {
return new Func1<FirebaseChildEvent, Boolean>() {
@Override
public Boolean call(FirebaseChildEvent firebaseChildEvent) {
return firebaseChildEvent.eventType == eventType;
}
};
}
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_ADDED));
}
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_CHANGED));
}
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_MOVED));
}
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) {
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_REMOVED));
}
public static Observable<DataSnapshot> observe(final Query ref) {
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
@Override
public void call(final Subscriber<? super DataSnapshot> subscriber) {
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
subscriber.onNext(dataSnapshot);
}
@Override
public void onCancelled(FirebaseError error) {
// Turn the FirebaseError into a throwable to conform to the API
subscriber.onError(new FirebaseException(error.getMessage()));
}
});
// When the subscription is cancelled, remove the listener
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
ref.removeEventListener(listener);
}
}));
}
});
}
}
Owner

gsoltis commented May 14, 2014

I'd love some feedback from anyone more familiar with RxJava. In particular, converting FirebaseError to a throwable seems to lose some semantic meaning (is a throwable the only way a stream can error out?). Also, any thoughts on threading would be greatly appreciated. The Firebase client will spin up its own thread to fire events, and on Android it will use the main Looper. I don't know what Rx devs normally expect in terms of what thread a particular callback will run on, so advice there would be great.

zsxwing commented May 15, 2014

ref.removeEventListener(listener); is thread-safe? Subscription.unsubscribe can be called in other thread.

zsxwing commented May 15, 2014

And if observeChildren can be called in any thread, ref.addChildEventListener should also be thread-safe.

petermd commented May 15, 2014

Not sure what you mean by "convert FirebaseException to a Throwable" - RxJava doesn't wrap the Exception so you can still handle the downstream Exception as FirebaseException (but you also need to handle / re-throw other Throwable types)

Owner

gsoltis commented May 15, 2014

@zsxwing Both methods are threadsafe, they append to a queue on a separate, internal thread managed by the library. I was mainly confused because it looks like code that wraps a Future into an observable blocks if you don't specify a TimeUnit. This code will not block ever, as all actual processing takes place on the internal queue thread.

Owner

gsoltis commented May 15, 2014

@petermd FirebaseError, the type returned by the Firebase API in the case of a problem, is not a subclass of throwable. We do have FirebaseException that's used as well, hence the conversion.

But presumably the end user gets a Throwable in their onError callback and has to do some instanceof checks to separate out different classes of errors?

zsxwing commented May 16, 2014

I was mainly confused because it looks like code that wraps a Future into an observable blocks if you don't specify a TimeUnit.

@gsoltis can you give an example? I can not find Future in your codes.

Owner

gsoltis commented May 16, 2014

@zsxwing Sorry, I wasn't clear. I meant in contrast with the official handling of Futures by the RxJava library. Specifically: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java#L65

It appears to block on the result of the future when a subscriber is added, and then fire the callback on the same thread when the value is available. My code above will not do this, which I think is right given that it is a stream of values, rather than a single value. My question was around the expected behavior though. One option, similar to what the Future Observable is doing, would be to suspend the subscribing thread and only wake it up to fire events. The code, as currently written will instead return immediately from the subscribe call and will fire events on a separate thread.

zsxwing commented May 17, 2014

@gsoltis Your code is right. The blocking OperatorToObservableFuture is a compromise since Future doest not provide a callback API.

Hi. I'm not familiar with Firebase but here are my comments about the RxJava usage.

L32-58: are the callbacks coming from a single thread? If not you should wrap the child subscriber with SerializedSubscriber and route calls through it. Same for L103-115.

FirebaseError: you could add the entire error object to the exception, not just the message.

Owner

gsoltis commented May 18, 2014

@zsxwing Good to know, thanks!

@akarnokd The callbacks do come from a single thread by default. It's possible for users to override that behavior, but I think they will then be on their own to modify these bindings.

Adding the FirebaseError to the exception is a good idea, I'll look into making that change. It might take a bit to do that one since I'll have to modify the Firebase SDK where FirebaseException is defined.

This looks really useful!
I was wondering if you could add some kind of licence if you are happy for other people to use this in their work?

Where are you getting Constants.EventType from? Is there someplace I could get access to those variables?

For anyone who should be interested, here is the missing enum declaration:

enum EventType {
    CHILD_ADDED, CHILD_CHANGED, CHILD_REMOVED, CHILD_MOVED
}

If anyone is interested I have forked this and made my own adjustments here: https://gist.github.com/TinasheMzondiwa/3b330c72a5b6625d66c2e1b63abaac9b

putt commented Oct 23, 2016

Will "new Observable.OnSubscribe" block cause some memory leak?
After call the .subscribe(), I can unsubscribe subscription in onDestory(). Then what happened to the running "new Observable.OnSubscribe" block ?

Hi, nice share @gssoltis.
But, how to do it in Java? It is android way.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment