Skip to content

Instantly share code, notes, and snippets.

@cemo
Forked from austynmahoney/CachedRefreshable.java
Created September 22, 2015 20:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cemo/4e53aba217b2b782e709 to your computer and use it in GitHub Desktop.
Save cemo/4e53aba217b2b782e709 to your computer and use it in GitHub Desktop.
RxJava Refreshable Pattern
public abstract class CachedRefreshable<P, T> extends Refreshable<P, T> {
protected abstract Observable<T> getSourceObservable(P parameters);
/**
* Return the Observable that gets data from a cached source.
*
* @return Observable from cache item, or null if the cache misses.
*/
protected abstract Observable<T> getCachedObservable(P parameters);
/**
* When source data is acquired, this function will be used as a callback so that the acquired data can be cached.
*/
protected abstract void updateCache(P parameters, T data);
/**
* When data comes from a miscellaneous source, this provides a mechanism to broadcast those changes to subscribers.
*/
public void setValue(P parameters, T data) {
updateCache(parameters, data);
mInternalObserver.onNext(data);
}
/**
* @return Cache-enabled Observable without providing parameters.
*/
public Observable<T> getObservable() {
return getObservable(null);
}
/**
* @param parameters Parameters for the action being observed by this Refreshable.
* @return Cache-enabled Observable with the specified parameters.
*/
public Observable<T> getObservable(P parameters) {
return getObservable(parameters, true);
}
/**
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g.
* network request or cached data).
*
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper
* {@link Observable} from a map of possible options.
* @param allowCache Whether to check for a cached copy of the data being observed.
*/
public Observable<T> getObservable(P parameters, boolean allowCache) {
final Observable<T> cachedObservable;
if (allowCache && (cachedObservable = getCachedObservable(parameters)) != null) {
//unsubscribe();
// Use the cache
cachedObservable.subscribe(mInternalObserver);
} else {
Subscription subscription = mSourceSubscriptions.get(parameters);
if (subscription == null || subscription.isUnsubscribed()) {
// Subscribe to server
updateSubscription(parameters);
}
}
return mSubject;
}
@Override
protected void updateSubscription(final P parameters) {
Subscription subscription = getSourceObservable(parameters).doOnNext(new Action1<T>() {
@Override
public void call(T t) {
updateCache(parameters, t);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
mSourceSubscriptions.put(parameters, subscription);
}
}
@Singleton
public class CurrentUserCachedRefreshable extends CachedRefreshable<Void, UserProfile> {
@Inject UserManager mUserManager;
@Override
protected boolean isCompletable() {
return true;
}
@Inject
public CurrentUserCachedRefreshable() {
}
@Override
protected Observable<UserProfile> getSourceObservable(Void parameters) {
return mWelbeService.getCurrentUser().map(new Func1<UserResponse, UserProfile>() {
@Override
public UserProfile call(UserResponse userResponse) {
return userResponse.userProfile;
}
});
}
@Override
protected Observable<UserProfile> getCachedObservable(Void parameters) {
return mUserManager.getUserProfileObservable();
}
@Override
protected void updateCache(Void parameters, UserProfile data) {
mUserManager.updateUserProfile(data);
}
}
public class LogActivityRefreshable extends Refreshable<List<LogActivityRequest>, LoggingResponse> {
/* Retrofit web service interface */
@Inject WebService mWebService;
@Inject CurrentUserRefreshable mCurrentUserRefreshable;
@Override
protected Observable<LoggingResponse> getSourceObservable(final List<LogActivityRequest> requests) {
return mCurrentUserRefreshable.getObservable().flatMap(new Func1<UserProfile, Observable<LoggingResponse>>() {
@Override
public Observable<LoggingResponse> call(UserProfile userProfile) {
// We need the user ID before we continue
List<Observable<LoggingResponse>> observables = new ArrayList<>(requests.size());
// Create a Retrofit web service Observable for each request
for (LogActivityRequest request : requests) {
observables.add(mWebService.logActivity(userProfile.id, request));
}
return Observable.merge(observables);
}
});
}
}
public abstract class Refreshable<P, T> {
/**
* Holds subscriptions based per parameters.
*/
protected Map<P, Subscription> mSourceSubscriptions = new HashMap<>();
// internal observation
protected final Subject<T, T> mSubject = BehaviorSubject.create();
protected final Observer<T> mInternalObserver = new Observer<T>() {
@Override
public void onCompleted() {
// internalObserver's job is to skip onComplete events.
}
@Override
public void onError(Throwable e) {
mSubject.onError(e);
}
@Override
public void onNext(T data) {
mSubject.onNext(data);
}
};
/**
* Return the Observable that gets data from an original source, such as a network or database. Useful with Retrofit, where Observables
* are returned.
*/
protected abstract Observable<T> getSourceObservable(P parameters);
/**
* @return Cache-enabled Observable without providing parameters.
*/
public Observable<T> getObservable() {
return getObservable(null);
}
/**
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g.
* network request or cached data).
*
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper
* {@link Observable} from a map of possible options.
*/
public Observable<T> getObservable(P parameters) {
getSourceObservable(parameters)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
return mSubject;
}
public void refreshFromSource(P parameters) {
unsubscribe(parameters);
updateSubscription(parameters);
}
private void unsubscribe(P parameters) {
Subscription subscription = mSourceSubscriptions.get(parameters);
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
mSourceSubscriptions.remove(parameters);
}
}
protected void updateSubscription(final P parameters) {
Subscription subscription = getObservable(parameters)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
mSourceSubscriptions.put(parameters, subscription);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment