Skip to content

Instantly share code, notes, and snippets.

@andaag
Created October 14, 2015 11:55
Show Gist options
  • Save andaag/8372ba6745e2f2a6f8ed to your computer and use it in GitHub Desktop.
Save andaag/8372ba6745e2f2a6f8ed to your computer and use it in GitHub Desktop.
package no.finn.android.contentprovider;
import android.content.ContentResolver;
import android.database.ContentObserver;
import android.database.Cursor;
import android.net.Uri;
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import no.finn.android.FinnApplication;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
public class RxContentProvider {
//NB NB NB : Caller is responsible for calling unsubscribe AND closing the cursor!!
private final static Handler contentObserverHandler = new Handler(Looper.getMainLooper());
private final static ContentResolver contentResolver = FinnApplication.instance().getContentResolver();
private final static Scheduler scheduler = Schedulers.io();
public static Observable<Cursor> createQuery(@NonNull final Uri uri, @Nullable final String[] projection,
@Nullable final String selection, @Nullable final String[] selectionArgs, @Nullable
final String sortOrder, final boolean notifyForDescendents) {
return Observable.create(new Observable.OnSubscribe<Cursor>() {
private Cursor getCursor() {
return contentResolver.query(uri, projection, selection, selectionArgs, sortOrder);
}
@Override
public void call(final Subscriber<? super Cursor> subscriber) {
subscriber.onNext(getCursor());
final ContentObserver observer = new ContentObserver(contentObserverHandler) {
@Override
public void onChange(boolean selfChange) {
runAction(() -> {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(getCursor());
}, scheduler);
}
};
contentResolver.registerContentObserver(uri, notifyForDescendents, observer);
subscriber.add(Subscriptions.create(() -> contentResolver.unregisterContentObserver(observer)));
}
}).onBackpressureBuffer().subscribeOn(scheduler).observeOn(AndroidSchedulers.mainThread());
}
private static void runAction(final Action0 action, Scheduler scheduler) {
final Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
action.call();
worker.unsubscribe();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment