Skip to content

Instantly share code, notes, and snippets.

@dhenry
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dhenry/58844bdb1058fbce6212 to your computer and use it in GitHub Desktop.
Save dhenry/58844bdb1058fbce6212 to your computer and use it in GitHub Desktop.
RXJava cache mechanism using Subjects
import android.support.annotation.Nullable;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
/**
* Provides infrastructure for retrieving network data, caching the response in an {@link AsyncSubject}
* and allowing consumers to subscribe to the cached response as an {@link rx.Observable}.
*
* Instances of this class are created via the {@link com.zappos.android.cache.CacheFactory}.
*
* @author dhenry
* @author gaara87
*
* @param <T> the data object being retrieved/cached
*/
class Cache<T> {
private static final Long DEFAULT_REFRESH_RATE = 10000l; // 10 seconds
private final ReentrantLock mLock = new ReentrantLock();
private final List<Observable<T>> mDataRetrievers = new ArrayList<>();
private final AsyncSubject<T> mSubject = AsyncSubject.create();
private Long mRefreshRate = DEFAULT_REFRESH_RATE;
private Long mRefreshedAt;
/**
* Create a {@link Cache} with an {@link rx.Observable}.
* @param dataRetriever the data retriever
* @param refreshRate the cache refreshRate
*/
protected Cache(@NotNull final Observable<T> dataRetriever, @Nullable Long refreshRate) {
this.mDataRetrievers.add(dataRetriever);
if (refreshRate != null) {
mRefreshRate = refreshRate;
}
}
/**
* Create a {@link Cache} with multiple data retriever {@link rx.Observable}s.
* @param dataRetrievers the data retrievers
* @param refreshRate the cache refreshRate
*/
protected Cache(@NotNull List<Observable<T>> dataRetrievers, @Nullable Long refreshRate) {
this.mDataRetrievers.addAll(dataRetrievers);
if (refreshRate != null) {
mRefreshRate = refreshRate;
}
}
/**
* Asynchronously retrieve the data and store it internally in an {@link rx.subjects.AsyncSubject}.
*/
private void fetchAsync() {
mLock.lock();
try {
for (Observable<T> retriever : mDataRetrievers) {
retriever.subscribeOn(Schedulers.newThread())
.subscribe(mSubject);
}
} finally {
mLock.unlock();
}
}
/**
* Provide the observable for consumers and refresh data if necessary.
* @return Observable the observable
*/
protected Observable<T> getSubject() {
if (requiresRefresh()) {
fetchAsync();
}
return mSubject;
}
/**
* Checks if the cache needs to be refreshed.
* @return true if the cache needs to be refreshed, otherwise false
*/
private boolean requiresRefresh() {
if (mRefreshedAt == null || (mRefreshedAt + mRefreshRate <= new Date().getTime())) {
mRefreshedAt = new Date().getTime();
return true;
}
return false;
}
/**
* Replace the {@link Cache}s data retriever(s).
* @param dataRetriever the new data retriever
*/
protected void setRetriever(Observable<T> dataRetriever) {
mLock.lock();
try {
this.mDataRetrievers.clear();
this.mDataRetrievers.add(dataRetriever);
} finally {
mLock.unlock();
}
}
}
import android.support.annotation.Nullable;
import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
/**
* Provides simplified creation, modification and access to {@link Cache} objects.
*
* @author dhenry
* @author gaara87
*/
public class CacheFactory {
/**
* The currently loaded caches in the format:
* cache name/cache return type -> cache object
*/
private HashMap<String, Cache> mCaches = new HashMap<>();
/**
* Looks in caches for a {@link Cache} identified by {@param type}.
*
* If a match is found and {@param override} is true, set {@param dataRetriever} as its new input.
*
* If no match is found, create a new {@link Cache} with {@param dataRetriever}
* as its input.
*
* If a match is found but {@param override} is false, return the match.
*
* @param type the {@link Cache} return type
* @param override whether to replace an existing data retriever with a new one
* @param dataRetriever the observable that the cache should use to retrieve data
* @param refreshRate the data refresh rate
* @param <T> the cache return type
*/
public <T> void setCache(@NotNull Class<T> type, boolean override, @NotNull Observable<T> dataRetriever,
@Nullable final Long refreshRate) {
setCache(type.getSimpleName(), override, dataRetriever, refreshRate);
}
/**
* Looks in caches for a {@link Cache} identified by {@param key}.
*
* If a match is found and {@param override} is true, set {@param dataRetriever} as its new input.
*
* If no match is found, create a new {@link Cache} with {@param dataRetriever}
* as its input.
*
* If a match is found but {@param override} is false, return the match.
*
* @param override whether to replace an existing data retriever with a new one
* @param dataRetriever the observable that the cache should use to retrieve data
* @param refreshRate the data refresh rate
* @param <T> the cache return type
*/
@SuppressWarnings("unchecked")
public <T> void setCache(@NotNull final String key, boolean override, @NotNull Observable<T> dataRetriever,
@Nullable final Long refreshRate) {
if (mCaches.containsKey(key) && override) {
mCaches.get(key).setRetriever(dataRetriever);
} else if (!mCaches.containsKey(key)) {
mCaches.put(key, new Cache<>(dataRetriever, refreshRate));
}
}
/**
* Binds {@param subscriber} as an observer of a cache identified by {@param type}.
* @param type the {@link Cache} return type
* @param subscriber the subscriber to bind
* @param <T> the caches return type
*/
public <T> void setSubscriber(@NotNull Class<T> type, @NotNull Subscriber<T> subscriber) {
setSubscriber(type.getSimpleName(), subscriber);
}
/**
* Binds {@param subscriber} as an observer of a cache identified by {@param key}.
* @param key the cache key
* @param subscriber the subscriber to bind
* @param <T> the caches return type
*/
@SuppressWarnings("unchecked")
public <T> void setSubscriber(@NotNull final String key, @NotNull Subscriber<T> subscriber) {
if (mCaches.containsKey(key)) {
mCaches.get(key)
.getSubject()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
}
}
import android.content.Context;
import org.jetbrains.annotations.NotNull;
import java.lang.ref.WeakReference;
import rx.Subscriber;
/**
* Extend this to subscribe to {@link Cache} objects
* @param <T> the data entity type that we're interested in
*
* @author dhenry
* @author gaara87
*/
public abstract class CacheSubscriber<T> extends Subscriber<T> {
private WeakReference<Context> mContext;
public CacheSubscriber(@NotNull final Context context) {
this.mContext = new WeakReference<>(context);
}
@Override
final public void onCompleted() {
// nothing to do here!
}
@Override
final public void onError(Throwable e) {
if (mContext.get() != null) {
onFail(e);
}
}
@Override
final public void onNext(T result) {
if (mContext.get() != null) {
onSuccess(result);
}
}
/**
* Implement an on error handler.
* @param e an exception that was thrown
*/
public abstract void onFail(Throwable e);
/**
* Implement a success handler.
* @param result the response object
*/
public abstract void onSuccess(T result);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment