Last active
August 29, 2015 14:18
-
-
Save dhenry/58844bdb1058fbce6212 to your computer and use it in GitHub Desktop.
RXJava cache mechanism using Subjects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.support.annotation.Nullable; | |
import org.jetbrains.annotations.NotNull; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.List; | |
import java.util.concurrent.locks.ReentrantLock; | |
import rx.Observable; | |
import rx.schedulers.Schedulers; | |
import rx.subjects.AsyncSubject; | |
/** | |
* Provides infrastructure for retrieving network data, caching the response in an {@link AsyncSubject} | |
* and allowing consumers to subscribe to the cached response as an {@link rx.Observable}. | |
* | |
* Instances of this class are created via the {@link com.zappos.android.cache.CacheFactory}. | |
* | |
* @author dhenry | |
* @author gaara87 | |
* | |
* @param <T> the data object being retrieved/cached | |
*/ | |
class Cache<T> { | |
private static final Long DEFAULT_REFRESH_RATE = 10000l; // 10 seconds | |
private final ReentrantLock mLock = new ReentrantLock(); | |
private final List<Observable<T>> mDataRetrievers = new ArrayList<>(); | |
private final AsyncSubject<T> mSubject = AsyncSubject.create(); | |
private Long mRefreshRate = DEFAULT_REFRESH_RATE; | |
private Long mRefreshedAt; | |
/** | |
* Create a {@link Cache} with an {@link rx.Observable}. | |
* @param dataRetriever the data retriever | |
* @param refreshRate the cache refreshRate | |
*/ | |
protected Cache(@NotNull final Observable<T> dataRetriever, @Nullable Long refreshRate) { | |
this.mDataRetrievers.add(dataRetriever); | |
if (refreshRate != null) { | |
mRefreshRate = refreshRate; | |
} | |
} | |
/** | |
* Create a {@link Cache} with multiple data retriever {@link rx.Observable}s. | |
* @param dataRetrievers the data retrievers | |
* @param refreshRate the cache refreshRate | |
*/ | |
protected Cache(@NotNull List<Observable<T>> dataRetrievers, @Nullable Long refreshRate) { | |
this.mDataRetrievers.addAll(dataRetrievers); | |
if (refreshRate != null) { | |
mRefreshRate = refreshRate; | |
} | |
} | |
/** | |
* Asynchronously retrieve the data and store it internally in an {@link rx.subjects.AsyncSubject}. | |
*/ | |
private void fetchAsync() { | |
mLock.lock(); | |
try { | |
for (Observable<T> retriever : mDataRetrievers) { | |
retriever.subscribeOn(Schedulers.newThread()) | |
.subscribe(mSubject); | |
} | |
} finally { | |
mLock.unlock(); | |
} | |
} | |
/** | |
* Provide the observable for consumers and refresh data if necessary. | |
* @return Observable the observable | |
*/ | |
protected Observable<T> getSubject() { | |
if (requiresRefresh()) { | |
fetchAsync(); | |
} | |
return mSubject; | |
} | |
/** | |
* Checks if the cache needs to be refreshed. | |
* @return true if the cache needs to be refreshed, otherwise false | |
*/ | |
private boolean requiresRefresh() { | |
if (mRefreshedAt == null || (mRefreshedAt + mRefreshRate <= new Date().getTime())) { | |
mRefreshedAt = new Date().getTime(); | |
return true; | |
} | |
return false; | |
} | |
/** | |
* Replace the {@link Cache}s data retriever(s). | |
* @param dataRetriever the new data retriever | |
*/ | |
protected void setRetriever(Observable<T> dataRetriever) { | |
mLock.lock(); | |
try { | |
this.mDataRetrievers.clear(); | |
this.mDataRetrievers.add(dataRetriever); | |
} finally { | |
mLock.unlock(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.support.annotation.Nullable; | |
import org.jetbrains.annotations.NotNull; | |
import java.util.HashMap; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.android.schedulers.AndroidSchedulers; | |
/** | |
* Provides simplified creation, modification and access to {@link Cache} objects. | |
* | |
* @author dhenry | |
* @author gaara87 | |
*/ | |
public class CacheFactory { | |
/** | |
* The currently loaded caches in the format: | |
* cache name/cache return type -> cache object | |
*/ | |
private HashMap<String, Cache> mCaches = new HashMap<>(); | |
/** | |
* Looks in caches for a {@link Cache} identified by {@param type}. | |
* | |
* If a match is found and {@param override} is true, set {@param dataRetriever} as its new input. | |
* | |
* If no match is found, create a new {@link Cache} with {@param dataRetriever} | |
* as its input. | |
* | |
* If a match is found but {@param override} is false, return the match. | |
* | |
* @param type the {@link Cache} return type | |
* @param override whether to replace an existing data retriever with a new one | |
* @param dataRetriever the observable that the cache should use to retrieve data | |
* @param refreshRate the data refresh rate | |
* @param <T> the cache return type | |
*/ | |
public <T> void setCache(@NotNull Class<T> type, boolean override, @NotNull Observable<T> dataRetriever, | |
@Nullable final Long refreshRate) { | |
setCache(type.getSimpleName(), override, dataRetriever, refreshRate); | |
} | |
/** | |
* Looks in caches for a {@link Cache} identified by {@param key}. | |
* | |
* If a match is found and {@param override} is true, set {@param dataRetriever} as its new input. | |
* | |
* If no match is found, create a new {@link Cache} with {@param dataRetriever} | |
* as its input. | |
* | |
* If a match is found but {@param override} is false, return the match. | |
* | |
* @param override whether to replace an existing data retriever with a new one | |
* @param dataRetriever the observable that the cache should use to retrieve data | |
* @param refreshRate the data refresh rate | |
* @param <T> the cache return type | |
*/ | |
@SuppressWarnings("unchecked") | |
public <T> void setCache(@NotNull final String key, boolean override, @NotNull Observable<T> dataRetriever, | |
@Nullable final Long refreshRate) { | |
if (mCaches.containsKey(key) && override) { | |
mCaches.get(key).setRetriever(dataRetriever); | |
} else if (!mCaches.containsKey(key)) { | |
mCaches.put(key, new Cache<>(dataRetriever, refreshRate)); | |
} | |
} | |
/** | |
* Binds {@param subscriber} as an observer of a cache identified by {@param type}. | |
* @param type the {@link Cache} return type | |
* @param subscriber the subscriber to bind | |
* @param <T> the caches return type | |
*/ | |
public <T> void setSubscriber(@NotNull Class<T> type, @NotNull Subscriber<T> subscriber) { | |
setSubscriber(type.getSimpleName(), subscriber); | |
} | |
/** | |
* Binds {@param subscriber} as an observer of a cache identified by {@param key}. | |
* @param key the cache key | |
* @param subscriber the subscriber to bind | |
* @param <T> the caches return type | |
*/ | |
@SuppressWarnings("unchecked") | |
public <T> void setSubscriber(@NotNull final String key, @NotNull Subscriber<T> subscriber) { | |
if (mCaches.containsKey(key)) { | |
mCaches.get(key) | |
.getSubject() | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(subscriber); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.content.Context; | |
import org.jetbrains.annotations.NotNull; | |
import java.lang.ref.WeakReference; | |
import rx.Subscriber; | |
/** | |
* Extend this to subscribe to {@link Cache} objects | |
* @param <T> the data entity type that we're interested in | |
* | |
* @author dhenry | |
* @author gaara87 | |
*/ | |
public abstract class CacheSubscriber<T> extends Subscriber<T> { | |
private WeakReference<Context> mContext; | |
public CacheSubscriber(@NotNull final Context context) { | |
this.mContext = new WeakReference<>(context); | |
} | |
@Override | |
final public void onCompleted() { | |
// nothing to do here! | |
} | |
@Override | |
final public void onError(Throwable e) { | |
if (mContext.get() != null) { | |
onFail(e); | |
} | |
} | |
@Override | |
final public void onNext(T result) { | |
if (mContext.get() != null) { | |
onSuccess(result); | |
} | |
} | |
/** | |
* Implement an on error handler. | |
* @param e an exception that was thrown | |
*/ | |
public abstract void onFail(Throwable e); | |
/** | |
* Implement a success handler. | |
* @param result the response object | |
*/ | |
public abstract void onSuccess(T result); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment