Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A Presenter when using RxAndroid which delays delivering to the View when the View isn't ready
/*
* The MIT License (MIT)
*
* Copyright (c) 2014 Konstantin Mikheev sirstripy-at-gmail-com
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package net.grandcentrix.rx;
import java.util.ArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
/**
* This operator delays onNext, onComplete and onError emissions until a True value received from a
* given observable. When the given observable emits False, the operator starts delaying emissions
* again.
* <p/>
* semaphoreLatest variant drops older not emitted onNext value if a new value has been received.
* <p/>
* semaphoreLatestCache keeps the latest value after emission and sends it on each True value from a
* given observable received. This variant never emits onCompleted.
*
* @param <T> a type of onNext value
* @author konmik https://github.com/konmik/nucleus/blob/53ecd398cec6e85b58545b4d30cfa961470d9f68/nucleus-example-with-tests/src/main/java/nucleus/example/main/MainPresenter.java
*/
public class OperatorSemaphore<T> implements Observable.Operator<T, T> {
private boolean cache;
private Observable<Boolean> go;
private boolean latest;
private OperatorSemaphore(Observable<Boolean> go) {
this.go = go;
}
private OperatorSemaphore(Observable<Boolean> go, boolean latest) {
this.go = go;
this.latest = latest;
}
private OperatorSemaphore(Observable<Boolean> go, boolean latest, boolean cache) {
this.go = go;
this.latest = latest;
this.cache = cache;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>() {
boolean deliverCompleted;
boolean deliverError;
Throwable error;
boolean hasCache;
boolean isOpen;
ArrayList<T> next = new ArrayList<>();
T nextCache;
@Override
public void onCompleted() {
if (!cache) {
deliverCompleted = true;
tick(false);
}
}
@Override
public void onError(Throwable e) {
error = e;
deliverError = true;
tick(false);
}
@Override
public void onNext(T o) {
if (latest) {
next.clear();
}
next.add(o);
tick(false);
}
@Override
public void onStart() {
super.onStart();
add(go.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
isOpen = aBoolean;
tick(cache);
}
}));
child.add(this);
}
void tick(boolean deliverCache) {
if (!isUnsubscribed() && isOpen) {
while (next.size() > 0) {
T value = next.remove(0);
child.onNext(value);
deliverCache = false;
if (cache) {
nextCache = value;
hasCache = true;
}
}
if (deliverCache && hasCache) {
child.onNext(nextCache);
}
if (deliverCompleted) {
child.onCompleted();
unsubscribe();
}
if (deliverError) {
child.onError(error);
unsubscribe();
}
}
}
};
}
/**
* Returns an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
*
* @param go an operator that controls emission.
* @param <T> a type of onNext value.
* @return an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
*/
public static <T> OperatorSemaphore<T> semaphore(Observable<Boolean> go) {
return new OperatorSemaphore<>(go);
}
/**
* Returns an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
* <p/>
* This variant drops older not emitted value if a new value has been received.
*
* @param go an operator that controls emission.
* @param <T> a type of onNext value.
* @return an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
* <p/>
* This variant drops older not emitted value if a new value has been received.
*/
public static <T> OperatorSemaphore<T> semaphoreLatest(Observable<Boolean> go) {
return new OperatorSemaphore<>(go, true);
}
/**
* Returns an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
* <p/>
* This variant drops older not emitted value if a new value has been received.
* <p/>
* It also keeps the latest value after emission and sends it on each True value from a given
* observable received. This variant never emits onCompleted.
*
* @param go an operator that controls emission.
* @param <T> a type of onNext value.
* @return an operator that delays onNext, onComplete and onError emissions until a True value
* received from a given observable. When the given observable emits False, the operator starts
* delaying emissions again.
* <p/>
* This variant drops older not emitted value if a new value has been received.
* <p/>
* It also keeps the latest value after emission and sends it on each True value from a given
* observable received. This variant never emits onCompleted.
*/
public static <T> OperatorSemaphore<T> semaphoreLatestCache(Observable<Boolean> go) {
return new OperatorSemaphore<>(go, true, true);
}
}
package net.grandcentrix.rx;
import android.os.Bundle;
import android.support.v4.app.Fragment;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import net.grandcentrix.rx.OperatorSemaphore;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.BehaviorSubject;
import rx.subscriptions.CompositeSubscription;
/**
* Represents the Presenter of the popular Model-View-Presenter design pattern.
* <p/>
* The presenter connects the View V to a model which don't know each other. The View is passive and
* provides this Presenter with events from the UI. It's an RxPresenter because it works with {@link
* rx.Observable} from RxJava to communicate with the View.
* <p/>
* Created by pascalwelsch on 4/17/15.
*/
public abstract class RxPresenter<V> {
private final V mView;
private CompositeSubscription mUiSubscriptions = new CompositeSubscription();
private BehaviorSubject<Boolean> mViewReady = BehaviorSubject.create(false);
public RxPresenter(final V view) {
mView = view;
}
/**
* @return the view of this presenter
*/
protected V getView() {
return mView;
}
/**
* add your subscriptions for View events to this method to get them automatically cleaned up in
* {@link #sleep()}. typically call this in {@link #wakeUp()} where you subscribe to the UI
* events
*/
protected void manageViewSubscription(final Subscription subscription) {
mUiSubscriptions.add(subscription);
}
/**
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view
* become available. getView() is guaranteed to be != null during all emissions. This
* transformer can only be used on application's main thread.
* <p/>
* If the transformer receives a next value while the previous value has not been delivered, the
* previous value will be dropped.
* <p/>
* The transformer will duplicate the latest onNext emission in case if a view has been
* reattached.
* <p/>
* This operator ignores onComplete emission and never sends one.
* <p/>
* Use this operator when you need to show updatable data that needs to be cached in memory.
*
* @param <T> a type of onNext value.
* @return the delaying operator.
*/
public <T> Observable.Transformer<T, T> deliverLatestCacheToView() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.lift(OperatorSemaphore.<T>semaphoreLatestCache(isViewReady()));
}
};
}
/**
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view
* become available. getView() is guaranteed to be != null during all emissions. This
* transformer can only be used on application's main thread.
* <p/>
* If this transformer receives a next value while the previous value has not been delivered,
* the previous value will be dropped.
* <p/>
* Use this operator when you need to show updatable data.
*
* @param <T> a type of onNext value.
* @return the delaying operator.
*/
public <T> Observable.Transformer<T, T> deliverLatestToView() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.lift(OperatorSemaphore.<T>semaphoreLatest(isViewReady()));
}
};
}
/**
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view
* become available. getView() is guaranteed to be != null during all emissions. This
* transformer can only be used on application's main thread.
* <p/>
* Use this operator if you need to deliver *all* emissions to a view, in example when you're
* sending items into adapter one by one.
*
* @param <T> a type of onNext value.
* @return the delaying operator.
*/
public <T> Observable.Transformer<T, T> deliverToView() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.lift(OperatorSemaphore.<T>semaphore(isViewReady()));
}
};
}
/**
* completes all observables of this presenter. Should be called when the view is about to die
* and will never come back.
* <p/>
* call this in {@link Fragment#onDestroy()}
* <p/>
* complete all {@link rx.Observer}, i.e. BehaviourSubjects with {@link Observer#onCompleted()}
* to unsubscribe all observers
*/
public void destroy() {
mViewReady.onNext(false);
}
/**
* call sleep as the opposite of {@link #wakeUp()} to unsubscribe all observers listening to the
* UI observables of the view. Calling sleep in {@link Fragment#onDestroyView()} makes sense
* because observing a discarded view does not.
*/
public void sleep() {
mViewReady.onNext(false);
// unsubscribe all UI subscriptions created in wakeUp() and added
// via manageViewSubscription(Subscription)
mUiSubscriptions.unsubscribe();
// there is no reuse possible. recreation works fine
mUiSubscriptions = new CompositeSubscription();
}
/**
* when calling wakeUp the presenter starts to observe the observables of the View.
* <p/>
* Call this in a Fragment after {@link Fragment#onCreateView(LayoutInflater, ViewGroup,
* Bundle)} and after you created and published all observables the presenter will use. At the
* end of {@link Fragment#onViewCreated(View, Bundle)} is an appropriate place.
*/
public void wakeUp() {
mViewReady.onNext(true);
}
/**
* Observable of the view state. The View is ready to receive calls after calling {@link
* #wakeUp()} and before calling {@link #sleep()}.
*/
private Observable<Boolean> isViewReady() {
return mViewReady.asObservable().distinctUntilChanged();
}
}
/**
* usage of the RxPresenter in a Fragment
*/
public class ViewFragment extends Fragment {
private RxPresenter mPresenter;
@Override
public void onAttach(final Activity activity) {
super.onAttach(activity);
if (mPresenter == null) {
mPresenter = new SomePresenter(this, activity.getApplicationContext());
}
}
@Override
public void onDestroy() {
super.onDestroy();
mPresenter.destroy();
}
@Override
public void onDestroyView() {
super.onDestroyView();
mPresenter.sleep();
}
@Override
public void onViewCreated(final View view, final Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
mPresenter.wakeUp();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment