Skip to content

Instantly share code, notes, and snippets.

Last active September 4, 2015 01:15
Show Gist options
  • Save digitalbuddha/8d64fb03844b286e1f37 to your computer and use it in GitHub Desktop.
Save digitalbuddha/8d64fb03844b286e1f37 to your computer and use it in GitHub Desktop.
package net.danlew.sample;
import rx.Observable;
* Simulates three different sources - one from memory, one from disk,
* and one from network. In reality, they're all in-memory, but let's
* play pretend.
* Observable.create() is used so that we always return the latest data
* to the subscriber; if you use just() it will only return the data from
* a certain point in time.
public class BaseDAO<T> {
// Memory cache of data
private T memory = null;
// What's currently "written" on disk
private T disk = null;
private BehaviorSubject<T> updateStream;
public Observable<T> get()
Observable<Data> source = Observable.concat(
public Observable<T> fresh(){
// In order to simulate memory being cleared, but data still on disk
public void clearMemory() {
System.out.println("Wiping memory...");
memory = null;
//disk = null; not sure why you would ever want to delete instead of replace
private Observable<T> memory() {
return Observable.create(subscriber -> {
private Observable<T> disk() {
Observable<Data> observable = Observable.create(subscriber -> {
// Cache disk responses in memory
return observable.doOnNext(t -> memory = t)
private Observable<T> network() {
return Observable.create(subscriber -> {
subscriber.onNext(new NetworkRequest());
}).doOnNext(data -> {
disk = data;
memory = data;
public Observable<T> update(){
return updateStream.asObservable();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment