Skip to content

Instantly share code, notes, and snippets.

@zsxwing
Created February 24, 2014 10:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zsxwing/9185182 to your computer and use it in GitHub Desktop.
Save zsxwing/9185182 to your computer and use it in GitHub Desktop.
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.android.observables;
import java.lang.ref.WeakReference;
import rx.Notification;
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.operators.OperatorObserveFromAndroidComponent;
import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
public final class AndroidObservable {
private static final boolean USES_SUPPORT_FRAGMENTS;
static {
boolean supportFragmentsAvailable = false;
try {
Class.forName("android.support.v4.app.Fragment");
supportFragmentsAvailable = true;
} catch (ClassNotFoundException e) {
}
USES_SUPPORT_FRAGMENTS = supportFragmentsAvailable;
}
private AndroidObservable() {
}
/**
* Transforms a source observable to be attached to the given Activity, in
* such a way that notifications will always arrive on the main UI thread.
* Currently, this is equivalent to calling
* <code>observeOn(AndroidSchedulers.mainThread())</code>, but this behavior
* may change in the future, so it is encouraged to use this wrapper
* instead.
* <p/>
* You must unsubscribe from the returned observable in
* <code>onDestroy</code> to not leak the given Activity.
* <p/>
* Ex.:
*
* <pre>
* // in any Activity
* mSubscription = fromActivity(this, Observable.just("value")).subscribe(...);
* // in onDestroy
* mSubscription.unsubscribe();
* </pre>
*
* @param activity
* the activity in which the source observable will be observed
* @param sourceObservable
* the observable sequence to observe from the given Activity
* @param <T>
* @return a new observable sequence that will emit notifications on the
* main UI thread
*/
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
}
/**
* Transforms a source observable to be attached to the given fragment, in
* such a way that notifications will always arrive on the main UI thread.
* Moreover, it will be guaranteed that no notifications will be delivered
* to the fragment while it's in detached state (i.e. its host Activity was
* destroyed.) In other words, during calls to onNext, you may assume that
* fragment.getActivity() will never return null.
* <p/>
* This method accepts both native fragments and support library fragments
* in its first parameter. It will throw for unsupported types.
* <p/>
* You must unsubscribe from the returned observable in
* <code>onDestroy</code> to not leak the given fragment.
* <p/>
* Ex.:
*
* <pre>
* // in any Fragment
* mSubscription = fromFragment(this, Observable.just("value")).subscribe(...);
* // in onDestroy
* mSubscription.unsubscribe();
* </pre>
*
* @param fragment
* the fragment in which the source observable will be observed
* @param sourceObservable
* the observable sequence to observe from the given fragment
* @param <T>
* @return a new observable sequence that will emit notifications on the
* main UI thread
*/
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable,
(android.support.v4.app.Fragment) fragment);
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable,
(Fragment) fragment);
} else {
throw new IllegalArgumentException(
"Target fragment is neither a native nor support library Fragment");
}
}
/**
* Return an Observable that mirrors the source, or keep silent if
* <code>s.isUnsubscribed()</code> returns true.
*
* <pre>
* <code>
* public class TestActivity extends Activity {
*
* private Subscription s;
*
* private Observable&lt;String&gt; o; // After onDestroy, o emits nothing.
*
* {@literal @}Override
* protected void onCreate(Bundle savedInstanceState) {
* super.onCreate(savedInstanceState);
* s = new BooleanSubscription();
* o = AndroidObservable.attachToSubscription(Observable.from("Hello!"), s, Functions.alwaysTrue());
* }
*
* {@literal @}Override
* protected void onDestroy() {
* s.unsubscribe();
* super.onDestroy();
* }
* }
*
* </code>
* </pre>
*
* @param source
* @param s
* @param filter
* @return
*/
public static <T> Observable<T> attachToSubscription(Observable<T> source, final Subscription s,
Func1<Notification<T>, Boolean> filter) {
return attach(source, new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> t1) {
return !s.isUnsubscribed();
}
}, filter);
}
public static <T, AndroidComponent> Observable<T> attachToAndroidComponent(Observable<T> source,
AndroidComponent component, final Func2<AndroidComponent, Notification<T>, Boolean> filter) {
final WeakReference<AndroidComponent> componentRef;
Object memoryBarrier = new Object();
synchronized (memoryBarrier) { // force a memory barrier
componentRef = new WeakReference<AndroidComponent>(component);
}
return attach(source, new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> t1) {
AndroidComponent component = componentRef.get();
return component != null;
}
}, new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> t1) {
AndroidComponent component = componentRef.get();
return component != null && filter.call(component, t1);
}
});
}
/**
* Return an Observable that mirrors the source, or keep silent if
* <code>component</code> is recycled by GC. If <code>component</code> is
* alive but <code>filter.call</code> returns false, skip the current
* message.
*
* Note: <code>filter.call</code> runs in the UI thread.
*
* <pre>
* <code>
* Fragment fragment = ...;
* Observable&lt;String&gt; o = AndroidObservable.attachToAndroidComponentInUIThread(Observable.from("Hello!"), fragment,
* new Func2&lt;Fragment, Notification&lt;String&gt;, Boolean&gt;() {
*
* {@literal @}Override
* public Boolean call(Fragment fragment, Notification&lt;String&gt; t2) {
* return fragment.isAdded();
* }
*
* });
* </code>
* </pre>
*
* @param source
* @param component
* @param filter
* @return
*/
public static <T, AndroidComponent> Observable<T> attachToAndroidComponentInUIThread(Observable<T> source,
AndroidComponent component, Func2<AndroidComponent, Notification<T>, Boolean> filter) {
return attachToAndroidComponent(source.observeOn(AndroidSchedulers.mainThread()), component, filter);
}
public static <T> Observable<T> attach(Observable<T> source, Func1<Notification<T>, Boolean> isAttached,
Func1<Notification<T>, Boolean> filter) {
return source.materialize().takeWhile(isAttached).filter(filter).dematerialize();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment