cold / hot observable
~ cold: 데이터 들어오면 바로 처리 ~ ~ hot: 데이터가 들어오면 데이터를 홀드한 상태로 뭔가 다른 처리를 할 수 있도록 해줌 ~
cold: 일반적인 observable. 구독을 할때마다 새로운 스트림 생성. 그래서 붙어있는 모든 오퍼레이션이 각 스트림마다 실행이 됨-> 퍼포먼스 저하
hot: ConnectableObservable. publish()
로 만들 수 있음. 하나의 스트림. subscriber
가 있든 없든 아이템 emit
. 단 connect()
가 호출되어야 함. 매번 커넥트가 귀찮을땐 refCount()
-> subscriber
레퍼런스 갯수를 봐서 자동으로 해줌 -> 하지만 이때는 Observable
이 아닌 PublishSuject
로 해야 원하는 결과(multicast)를 얻을 수 있음
subject 에서 onError 는 onComplete 랑 같아서 item이 emit 되지 않음 -> 이땐 RxRelay의 PublishRelay? 를 쓰면 된다함 http://futurice.com/blog/top-7-tips-for-rxjava-on-android
unsubscribe onComplete unsubscribe는 서브젝트가 더이상 아이템을 emit 하지 않음....????
CompositeSubscription 서브스크립션 그룹 관리 -> 한번에 unsubscribe 할 수 있음!! 안드에서 라이프사이클에 관련해서 메모리릭등 문제를 피하기 위해서 많이 사용되는듯!
compose
: 뭔가 공통적인 오퍼레이션을 묶어줄 수 있음
observable
.compose(getTransformer())
.subscribe(str -> {
adapter2.addItem(str);
});
}
@NonNull
private <T> Observable.Transformer<T, T> getTransformer() {
return observable -> observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
hot observable : 하나의 스트림을 공유
cold : 서브스크립션할때마다 새 스트림 생성, 그전에 emit된 아이템들 다시 받음 -> 모든 오퍼레이션이 다시 실행됨
publish
: Hot observable 생성, connect해야 함refCount
: subscribe를 하면 자동으로 connect해줌replay
: emit된 아이템 다시 emit(이땐 이미 오퍼레이션을 통과한 녀석들을 받음)rxReplayshare
:connect
되기 전에 emit된 아이템 emit해줌
final Count count = new Count();
ConnectableObservable<String> observable = Observable
.range(0, 10)
.timestamp()
.map(timestamped -> String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis()))
.doOnNext(value -> count.increase())
.publish();
observable.subscribe(value -> {
System.out.println("subscriber1 : " + value);
});
observable.subscribe(value -> {
System.out.println("subscriber2 : " + value);
});
observable.connect();
System.out.println("연산횟수 : " + count.count());
hot 은 커넥터블 옵저버블을 사용함, 이때 커넥트를 하지 않으면 아이템이 emit 되지 않음
옵저버블 선택하기 하나의 스트림에 하나의 서브스크라이버 -> 그냥 Observable | 이전에 emit된 아이템이 필요 없을 경우 -> Observable.publish().refCount() | 모든 emit된 아이템이 필요할 경우 -> Observable.replay().refCount() | 가장 최근에 emit된 아이템이 필요할 경우 -> Observable.compose(ReplayingShare.instance())
RxJava Test
테스트용 서브스크라이버를 사용하면 됨
TestSubscriber.create();
로 옵저버블을 서브스크라이브 하고 getOnNextEvents()
메소드로 아이템을 가져다가 assert 하면 되는듯.
블로킹으로 가져와서 할수도 있지만 별로 좋진 않음? -> 쓰레드 후커? 프록시를 써서 하면 할 수 있음
public class TestSchedulerProxy {
private static final TestScheduler SCHEDULER = new TestScheduler();
private static final TestSchedulerProxy INSTANCE = new TestSchedulerProxy();
static {
try {
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
@Override
public Scheduler getIOScheduler() {
return SCHEDULER;
}
@Override
public Scheduler getComputationScheduler() {
return SCHEDULER;
}
@Override
public Scheduler getNewThreadScheduler() {
return SCHEDULER;
}
});
RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
@Override
public Scheduler getMainThreadScheduler() {
return SCHEDULER;
}
});
} catch (IllegalStateException e) {
throw new IllegalStateException("Schedulers class already initialized. " +
"Ensure you always use the TestSchedulerProxy in unit tests.");
}
}
public static TestSchedulerProxy get() {
return INSTANCE;
}
public void advanceBy(long delayTime, TimeUnit unit) {
SCHEDULER.advanceTimeBy(delayTime, unit);
}
}
이런식으로 후커를 만들고, 테스트 케이스안에서 스케줄러에 대해 아래 메소드를 호출하면 시간을 앞당인다????고 해야하나...?????
proxy.advanceBy(1, TimeUnit.SECONDS);
creating observable
되도록 Observable.create
는 쓰지 말도록-> OnSubscribe
imple 안에서 onNext
, onError
, onComplete
를 매뉴얼하게 호출해줘야 하는데 실수하면 메모리릭 가능성
그래서 from
을 쓰거나 defer
같은걸 쓰는게 좋을듯?
Observable.empty()
자주 사용하시는듯
Observable.error(new NullPointerException("error"));
도 자주 사용 -> validation 체크할 때
Observable.timer(100, TimeUnit.MILLISECONDS, testScheduler);
도 자주 사용 -> unsubscribe
잘 해줘야함
테스트할 떄 subscriber = new TestSubscriber();
를 사용하면 좋음
subscriber.assertValue("name");
이렇게 assert
할 수 도 있음
subscribeOn
하나의 스트림에 대해서 한번만 호출 가능(여러번 호출해도 최초에 호출한 스케쥴러만 적용됨).
아이템이 emit되는 쓰레드를 설정.
observeOn
하나의 스트림에 대해 여러번 호출 가능. 호출한 뒤로 체이닝된 오퍼레이션의 쓰레드를 설정(참고로 오퍼레이션이 스케쥴러를 바꾸기도함).
왜 subscribeOn
은 subscribe
할때 호출 하는가? -> subscribeOn
은 아이템의 emit과 관련된 스케줄러 설정인데 그렇다면 스트림 생성 후 바로 호출해줘야 맞는거 같음 -> 하지만 이렇게 되면 subscriber
가 이미 설정된 스케줄러를 바꿀 수 없음. 그래서 최대한 뒤로 미뤄서 하는듯???
일반적인 observable
(cold) 일 경우 subscribeOn
, subscribe
할때마다 새로운 스트림이 생성되기 때문에 각기 다른 스케쥴러를 줄 수 있음
flatMap
하나의 아이템을 받아서 여러개의 아이템으로 주고 싶을 때. 하지만 순서가 보장되지 않음.
concatMap
flatMap이랑 같은데 순서가 보장됨.
operators : http://rxmarbles.com/
4가지 서브젝트 타입
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
http://reactivex.io/documentation/subject.html Subject = Observable + Subscriber Observable, Subscriber 연결하는 녀석??
- AsyncSubject
onComplete
바로 이전의(마지막) item emit
- BehaviorSubject
it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
subscribe
시점 이후 가장 최근의 item + 나머지 item
기본값을 전달해주는 용도도 가능
- PublishSubject
기본적인 subject
subscribe
이후에 emit 되는 모든 item 받음
- ReplaySubject
emit 된 모든 item 을 버퍼에 저장하고 subscribe
하는 모든 Observer
에게 전달 (순서 보장)
ReplaySubject.create(int capacity)
: unbound. 미리 버퍼(배열) 할당 해주는 용(배열 카피 오버헤드 방지) 기본값 32k
ReplaySubject.createWithSize(int size)
: bound됨. 최대 버퍼 크기를 지정
Observable
은 생성 시점에 데이터를 넣어줘야 함. 근데 Subject
는 생성 후에 넣어줌
When you use a Subject as a Subscriber, take care not to call its onNext( ) method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.
To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
CompositeSubscription
서브스크립션을 한곳에 모아서 관리할때 사용 -> 보통 안드 라이프싸이클이랑 엮어서 씀
-> RxLifeCycle
을 이용해서 compose
를 하면됨
RxUserBus.sub().compose(bindToLifecycle()).subscribe((String s) -> {
Toast.makeText(this, s, Toast.LENGTH_SHORT).show();
});
자동으로 onPause
, onResume
될때 처리해줌
WeakReference
랑 엮어서 사용해보는건 별로 -> null
로 나와서??
static {
String TAG = "Rx-Debug Example";
RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
public Object onNext(DebugNotification n) {
Log.v(TAG, "onNext on " + n);
return super.onNext(n);
}
public Object start(DebugNotification n) {
Log.v(TAG, "start on " + n);
return super.start(n);
}
public void complete(Object context) {
Log.v(TAG, "complete on " + context);
}
public void error(Object context, Throwable e) {
Log.e(TAG, "error on " + context);
}
}));
}
registerObservableExecutionHook
로 훅을 걸어줄 수 있음.
이것보다 frodo를 사용하면 좀 더 쉽게 annotation processor 를 이용해서 쉽게 사용 가능함
public interface GitHubService {
@RxLogObservable(RxLogObservable.Scope.EVERYTHING)
@GET("/search/users?")
Observable<SearchResult> searchUsers(@Query("q") String query);
}
저렇게 어노테이션 붙이면 로깅을 할 수 있음....
attach debugger to android process
Break point를 사용해서 로그 찍기
cmd + F8
: break point 찍기
cmd + shift + F8
: break point 옵션 다이얼로그 띄우기
suspend
uncheck 하고
log evaluated expression
에 찍고자 하는 표현식 작성
MVCC: multiversion concurrency control
Realm 에서는 모든 인스턴스가 ThreadLocal
copyFromRealm
: 렘의 managed
객체를 stand alone?(unmanaged)
객체로 만들어 줌
retrofit의 RxJavaCallAdapterFactory
Observable.create()
통해서 생성할땐 반드시 isUnsubscribed()
체크를 해야하고, onNext
, onError
, onComplete
를 잘 써야함
ex) 이미
unsubscribe
됬는데onNext
로 아이템을 emit 하는 경우 체크해야하고,onError
호출시onComplete
는 호출되지 않아야 함, 등등....
@Override
public Observable<DynamicRealm> from(DynamicRealm realm) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new Observable.OnSubscribe<DynamicRealm>() {
@Override
public void call(final Subscriber<? super DynamicRealm> subscriber) {
// Get instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
final RealmChangeListener<DynamicRealm> listener = new RealmChangeListener<DynamicRealm>() {
@Override
public void onChange(DynamicRealm realm) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(observableRealm);
}
}
};
observableRealm.addChangeListener(listener);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}
}));
// Immediately call onNext with the current value, as due to Realm's auto-update, it will be the latest
// value.
subscriber.onNext(observableRealm);
}
});
}
subscriber.add(Subscriptions.create(new Action0()
이 부분이 꼭 필요함??? -> ??
https://youtu.be/hHnTIMjd1Y8?t=15m14s
RxJava in Action
RxAndroid, RxBinding, ReRelay
RxBinding: ui 위젯의 버튼 클릭, 스크롤 이벤트를 모두 Observable
로 제공
RxView.Click(view)
거의 모든곳에서 사용 ㄷㄷ
interval()
withLatestFrom(observable)
: 이벤트 발생시 다른 Observable
의 마지막 아이템과 합쳐서 처리?
distinctUntilChanged()
: 이벤트가 다를 경우에만 처리
startWith(sth)
: 구독시 첫 아이템으로 시작 할 수 있도록 해줌
onBackpressureDrop()
: 보통은 UI 쓰레드에서 버퍼를 쓰는데, 드랍은 이벤트 처리가 지연될때 유용, timer로 뭔가 처리할때 처리가 지연되면 안될 때, 굳이 모든 아이템을 받을 필요가 없을 때
롤링 배너를 만든다. 1초마다 페이징 그런데 손으로 드래그할땐 무시해야함.
val timer = Observable.interval(1000, TimeUnit.MILLISECONDS)
val dragging = RxViewPager.pageScrollStateChanges(vp_activity_view_pager)
.map { ViewPager.SCROLL_STATE_DRAGGING == it }
.distinctUntilChanged()
.startWith(false)
subscription.add(timer.withLatestFrom(dragging, { timer, dragging -> dragging })
.filter { !it } // 드래그 중이 아닐때만
.retry()
.onBackpressureDrop()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
with(vp_activity_view_pager) {
val currentIdx = currentItem
currentItem = if (currentIdx == adapter.count - 1) 0 else currentItem + 1
}
})
onError
, onComplete
발생시에도 스트림이 죽지 않음..
PublisRelay.create()
로 생성
어떤 조건이 되었을 때 뷰를 잠깐 보여주는 용도에서...이떄 반드시 backpressure
써줘야
private val visibilityPublishRelay by lazy { PublishRelay.create<Int>() }
with(subscription) {
add(visibilityPublishRelay
.retry()
.distinctUntilChanged()
.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
changeVisibility(it)
})
add(RxView.clicks(btn_activity_visibility_event)
.concatMap {
Observable.concat(
Observable.just(View.VISIBLE),
Observable.just(View.GONE).delay(3, TimeUnit.SECONDS))
}
.subscribe(visibilityPublishRelay))
}
private fun changeVisibility(toVisibility: Int) {
with(tv_activity_visibility_event) {
if (toVisibility == visibility) {
return
}
visibility = toVisibility
}
}
private val btnViewPager by lazy {
findViewById(R.id.btn_activity_main_view_pager) as Button
}
private val btnViewPager by lazy {
(findViewById(R.id.btn_activity_main_view_pager) as Button).apply {
textSize = 15f
}
}
android extentions
을 쓰면,
private val btnViewPager by lazy {
btn_activity_main_view_pager // 바로 xml 이름을
}
btnActivityMainViewPager.text = “”
btnActivityMainViewPager.setOnClockListener {
~~~
}
class OperatorMapSubscriptionItemListing : Observable.Operator<Listing, SubscriptionItem> {
override fun call(o: Subscriber<in Listing>): Subscriber<in SubscriptionItem> {
return object: Subscriber<SubscriptionItem>() {
override fun onError(e: Throwable?) {
o.onError(e)
}
override fun onCompleted() {
o.onCompleted()
}
override fun onNext(t: SubscriptionItem?) {
t?.let {
o.onNext(Listing(it.title, it.channelName,
"gdg://korea.android/listing/${it.itemId}?channelName=${it.channelName}"))
}
}
}
}
}
observable.lift(OperatorMapSubscriptionItemListing)
The following example shows how you can use the lift( )
operator to chain your custom operator (in this example: myOperator) alongside standard RxJava operators like ofType and map
:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new
myOperator<T>()).map({"transformed by myOperator: " + it});
The following section shows how you form the scaffolding of your operator so that it will work correctly with lift( )
.
Subject<T, R>
Observable
이자 Observer
toSerialized
로 여러 쓰레드서 접근해도 괜찮
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
: onObservableStart
를 호출하고 OnSubscribe
객체 리턴
return RxJavahooks.onObservableReturn(subscriber)
: Subscription
객체 리턴
java
Completable.create(new Completable.OnSubscribe() {
@Override
public void call(CompletableSubscriber completableSubscriber) {
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe();
GDG Korea Slack
half-but [5:16 PM] 안녕하세요 혹시 observable의 defer연산자에 대한 설명해주실수 있으신가요? 잘 이해가 가지 않아서요… 도움주세요
seongug.jung (정승욱) [6:51 PM] subscribe 되기전까지 대기하는 함수에요 이를테면
api.getData1()
.concatWith(api.getdata2())
...
위처럼 구성을 하면
getData1 과 getData2 가 거의 동시에 호출이 되요
그 과정에서 getData1 에서 에러가 나면
getData2 는 undelivered 에러가 발생하게 되요
(edited)
이런 경우를 방지할 수 있는 방법으로 defer 인데
concatWith 나 mergeWith 같은 경우에 쓰면설랑탕을 사왔는데 왜 먹지를 못하니
api.getData1()
.concatWith(Single.defer { api.getData2() })
...
이처럼 쓰면 getData1 이 처리가 끝나고 실제 concatWith 가 subscribe 가 되는 시점에 동작을 시작해요 이런 경우에도 쓸 수 있어요.
getLocal().concatWith(getApi())
이렇게 쓰면 로컬 보내고 최신 정보면 api 를 안 쏴도 되는 상황에서 문제가 될 수 있죠. 그럴때 defer 를 쓰면 로컬 보내고 갱신 안되도 됨으로 처리하면 api 를 안 쏘는 처리를 할 수 있어요 는 너무 늦게 대답한 느낌이네요
yjh5424 [7:53 PM] joined #reactivex along with geuntaek.
gaemi [11:19 AM]
최근에 .defer()
를 Hystrix 적용하면서 유용하게 사용했어요. ㅎㅎ
제 생각엔 .compose()
와 조합할 수 있는 최고의 함수가 아닐까 생각되네요.
gaemi [11:23 AM] 예를든다면 캐시를 사용하기 위하여 Single.Transformer 를 이런형태로 구성했어요.
public Single<T> call(Single<T> source) {
return Single.defer(() -> {
T cachedObj = findCache();
if (cachedObj != null) {
return Single.just(cachedObj);
}
return source.doOnSuccess(v -> {
Completable.fromAction(() -> putCache(v))
.subscribeOn(cacheScheduler)
.subscribe(Actions.empty(), Actions.empty());
});
});
}