Skip to content

Instantly share code, notes, and snippets.

@imminent
Last active August 11, 2016 17:21
Show Gist options
  • Star 20 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save imminent/8511925 to your computer and use it in GitHub Desktop.
Save imminent/8511925 to your computer and use it in GitHub Desktop.
CursorSubject is a Reactive Extension version of Android's CursorLoader. Handles retrieving the Cursor in a background thread, sending the result Cursor on the main thread, and resending a Cursor whenever the content changed notification is triggered.
package com.example.models;
import android.database.ContentObserver;
import android.database.Cursor;
import android.os.Handler;
import android.os.Looper;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
@ParametersAreNonnullByDefault
public abstract class CursorSubject implements Closeable {
public CursorSubject(Scheduler execute_on, final Scheduler observe_on) {
_last_cursor = new AtomicReference<>();
_cursor_channel = PublishSubject.create();
_execution_scheduler = execute_on;
_load_cursor_action = new Action0() {
@Override public void call() {
try {
final Cursor cursor = _cursor_retriever.retrieveCursor();
if (cursor != null) {
try {
// Ensure the cursor window is filled.
cursor.getCount();
cursor.registerContentObserver(_content_observer);
} catch (RuntimeException exception) {
cursor.close();
throw exception;
}
// Delivers resulting Cursor over Cursor channel
observe_on.schedule(cursor, _deliver_cursor_action);
}
} catch (Exception exception) {
_cursor_channel.onError(exception);
}
}
};
// Should make sure that the cursor is scheduled to be closed after the new one has been received
_deliver_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {
@Override public Subscription call(Scheduler scheduler, Cursor cursor) {
_cursor_channel.onNext(cursor);
updateLastCursor(cursor);
return Subscriptions.empty();
}
};
// Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
loadCursor();
}
/* Public API */
public Subscription subscribe(Observer<? super Cursor> observer) {
return _cursor_channel.subscribe(observer);
}
public Subscription subscribe(Action1<? super Cursor> on_next) {
return _cursor_channel.subscribe(on_next);
}
/**
* Retrieves the {@link android.database.Cursor}.
*/
@Nullable protected abstract Cursor retrieveCursor();
/* Closeable */
@Override public void close() {
_cursor_channel.onCompleted();
updateLastCursor(null);
}
/* package */void updateLastCursor(@Nullable Cursor cursor) {
// Makes sure to close cursor
final Cursor last_cursor = _last_cursor.getAndSet(cursor);
if (last_cursor != null) last_cursor.close();
}
/* package */void loadCursor() {
// Schedules Cursor loading on execution scheduler
_execution_scheduler.schedule(_load_cursor_action);
}
/**
* An implementation of a ContentObserver that takes care of connecting
* it to re-load the data when the observer is told it has changed.
*/
private final class ForceLoadContentObserver extends ContentObserver {
public ForceLoadContentObserver() {
super(new Handler(Looper.myLooper()));
}
@Override public boolean deliverSelfNotifications() {
return true;
}
@Override public void onChange(boolean _) {
loadCursor();
}
}
private final Scheduler _execution_scheduler;
/* package */final Action0 _load_cursor_action;
/* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _deliver_cursor_action;
/* package */final AtomicReference<Cursor> _last_cursor;
/* package */final PublishSubject<Cursor> _cursor_channel;
/* package */final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
}
package com.example;
import android.app.ListActivity;
import android.database.Cursor;
import android.net.Uri;
import android.os.Bundle;
import android.widget.CursorAdapter;
import android.widget.SimpleCursorAdapter;
import com.example.models.CursorSubject;
import rx.Observer;
import static rx.schedulers.Schedulers.io;
import static rx.android.schedulers.AndroidSchedulers.mainThread;
public class ExampleActivity extends ListActivity implements Observer<Cursor> {
/* Activity callbacks */
@Override protected onCreate(Bundle _) {
new CursorSubject(io(), mainThread()) {
@Override
@Nullable public Cursor retrieveCursor() {
return getContentResolver().query(Uri.EMPTY, null, null, null, null);;
}.subscribe(this);
/* Cursor Observer */
@Override public void onNext(Cursor cursor) {
if (cursor.isClosed()) return;
final CursorAdapter adapter = (CursorAdapter) getListAdapter();
if (adapter != null) adapter.swapCursor(cursor);
else setListAdapter(new SimpleCursorAdapter(this, android.R.layout.simple_list_item_1, cursor));
}
@Override public void onCompleted() { }
@Override public void onError(Throwable _) { }
}
@nvanbenschoten
Copy link

It doesn't look like this implementation is still supported by recent changes to the RxJava API (ie. Scheduler). Are you still using this code, or if not, have you found a better solution to the problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment