Skip to content

Instantly share code, notes, and snippets.

@roxrook
Forked from imminent/CursorSubject.java
Last active August 29, 2015 14:27
Show Gist options
  • Save roxrook/e5efe128696f6a859305 to your computer and use it in GitHub Desktop.
Save roxrook/e5efe128696f6a859305 to your computer and use it in GitHub Desktop.
CursorSubject is a Reactive Extension version of Android's CursorLoader. Handles retrieving the Cursor in a background thread, sending the result Cursor on the main thread, and resending a Cursor whenever the content changed notification is triggered.
package com.example.models;
import android.database.ContentObserver;
import android.database.Cursor;
import android.os.Handler;
import android.os.Looper;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
@ParametersAreNonnullByDefault
public abstract class CursorSubject implements Closeable {
public CursorSubject(Scheduler execute_on, final Scheduler observe_on) {
_last_cursor = new AtomicReference<>();
_cursor_channel = PublishSubject.create();
_execution_scheduler = execute_on;
_load_cursor_action = new Action0() {
@Override public void call() {
try {
final Cursor cursor = _cursor_retriever.retrieveCursor();
if (cursor != null) {
try {
// Ensure the cursor window is filled.
cursor.getCount();
cursor.registerContentObserver(_content_observer);
} catch (RuntimeException exception) {
cursor.close();
throw exception;
}
// Delivers resulting Cursor over Cursor channel
observe_on.schedule(cursor, _deliver_cursor_action);
}
} catch (Exception exception) {
_cursor_channel.onError(exception);
}
}
};
// Should make sure that the cursor is scheduled to be closed after the new one has been received
_deliver_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {
@Override public Subscription call(Scheduler scheduler, Cursor cursor) {
_cursor_channel.onNext(cursor);
updateLastCursor(cursor);
return Subscriptions.empty();
}
};
// Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
loadCursor();
}
/* Public API */
public Subscription subscribe(Observer<? super Cursor> observer) {
return _cursor_channel.subscribe(observer);
}
public Subscription subscribe(Action1<? super Cursor> on_next) {
return _cursor_channel.subscribe(on_next);
}
/**
* Retrieves the {@link android.database.Cursor}.
*/
@Nullable protected abstract Cursor retrieveCursor();
/* Closeable */
@Override public void close() {
_cursor_channel.onCompleted();
updateLastCursor(null);
}
/* package */void updateLastCursor(@Nullable Cursor cursor) {
// Makes sure to close cursor
final Cursor last_cursor = _last_cursor.getAndSet(cursor);
if (last_cursor != null) last_cursor.close();
}
/* package */void loadCursor() {
// Schedules Cursor loading on execution scheduler
_execution_scheduler.schedule(_load_cursor_action);
}
/**
* An implementation of a ContentObserver that takes care of connecting
* it to re-load the data when the observer is told it has changed.
*/
private final class ForceLoadContentObserver extends ContentObserver {
public ForceLoadContentObserver() {
super(new Handler(Looper.myLooper()));
}
@Override public boolean deliverSelfNotifications() {
return true;
}
@Override public void onChange(boolean _) {
loadCursor();
}
}
private final Scheduler _execution_scheduler;
/* package */final Action0 _load_cursor_action;
/* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _deliver_cursor_action;
/* package */final AtomicReference<Cursor> _last_cursor;
/* package */final PublishSubject<Cursor> _cursor_channel;
/* package */final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
}
package com.example;
import android.app.ListActivity;
import android.database.Cursor;
import android.net.Uri;
import android.os.Bundle;
import android.widget.CursorAdapter;
import android.widget.SimpleCursorAdapter;
import com.example.models.CursorSubject;
import rx.Observer;
import static rx.schedulers.Schedulers.io;
import static rx.android.schedulers.AndroidSchedulers.mainThread;
public class ExampleActivity extends ListActivity implements Observer<Cursor> {
/* Activity callbacks */
@Override protected onCreate(Bundle _) {
new CursorSubject(io(), mainThread()) {
@Override
@Nullable public Cursor retrieveCursor() {
return getContentResolver().query(Uri.EMPTY, null, null, null, null);;
}.subscribe(this);
/* Cursor Observer */
@Override public void onNext(Cursor cursor) {
if (cursor.isClosed()) return;
final CursorAdapter adapter = (CursorAdapter) getListAdapter();
if (adapter != null) adapter.swapCursor(cursor);
else setListAdapter(new SimpleCursorAdapter(this, android.R.layout.simple_list_item_1, cursor));
}
@Override public void onCompleted() { }
@Override public void onError(Throwable _) { }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment