Skip to content

Instantly share code, notes, and snippets.

@QuadFlask
Last active November 14, 2018 00:40
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save QuadFlask/145e80b4ac54d1541e2d38d9ce762a57 to your computer and use it in GitHub Desktop.
Save QuadFlask/145e80b4ac54d1541e2d38d9ce762a57 to your computer and use it in GitHub Desktop.
RxJava Study

RxJava Study github repo

1일차

cold / hot observable

~ cold: 데이터 들어오면 바로 처리 ~ ~ hot: 데이터가 들어오면 데이터를 홀드한 상태로 뭔가 다른 처리를 할 수 있도록 해줌 ~

cold: 일반적인 observable. 구독을 할때마다 새로운 스트림 생성. 그래서 붙어있는 모든 오퍼레이션이 각 스트림마다 실행이 됨-> 퍼포먼스 저하

hot: ConnectableObservable. publish() 로 만들 수 있음. 하나의 스트림. subscriber가 있든 없든 아이템 emit. 단 connect()가 호출되어야 함. 매번 커넥트가 귀찮을땐 refCount()-> subscriber 레퍼런스 갯수를 봐서 자동으로 해줌 -> 하지만 이때는 Observable이 아닌 PublishSuject 로 해야 원하는 결과(multicast)를 얻을 수 있음

참고: http://moka-a.github.io/android/rxAndroid_study/

subject 에서 onError 는 onComplete 랑 같아서 item이 emit 되지 않음 -> 이땐 RxRelay의 PublishRelay? 를 쓰면 된다함 http://futurice.com/blog/top-7-tips-for-rxjava-on-android

unsubscribe onComplete unsubscribe는 서브젝트가 더이상 아이템을 emit 하지 않음....????

CompositeSubscription 서브스크립션 그룹 관리 -> 한번에 unsubscribe 할 수 있음!! 안드에서 라이프사이클에 관련해서 메모리릭등 문제를 피하기 위해서 많이 사용되는듯!

realignist님 정리 자료

d2 자료


2일차

compose : 뭔가 공통적인 오퍼레이션을 묶어줄 수 있음

observable
				.compose(getTransformer())
				.subscribe(str -> {
					adapter2.addItem(str);
				});
	}

	@NonNull
	private <T> Observable.Transformer<T, T> getTransformer() {
		return observable -> observable
				.subscribeOn(Schedulers.io())
				.observeOn(AndroidSchedulers.mainThread());
	}

hot observable : 하나의 스트림을 공유

cold : 서브스크립션할때마다 새 스트림 생성, 그전에 emit된 아이템들 다시 받음 -> 모든 오퍼레이션이 다시 실행됨

  • publish : Hot observable 생성, connect해야 함
  • refCount : subscribe를 하면 자동으로 connect해줌
  • replay : emit된 아이템 다시 emit(이땐 이미 오퍼레이션을 통과한 녀석들을 받음)
  • rxReplayshare : connect되기 전에 emit된 아이템 emit해줌
final Count count = new Count();

		ConnectableObservable<String> observable = Observable
			.range(0, 10)
			.timestamp()
			.map(timestamped -> String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis()))
			.doOnNext(value -> count.increase())
			.publish();

		observable.subscribe(value -> {
			System.out.println("subscriber1 : " + value);
		});

		observable.subscribe(value -> {
			System.out.println("subscriber2 : " + value);
		});

		observable.connect();
		System.out.println("연산횟수 : " + count.count());

hot 은 커넥터블 옵저버블을 사용함, 이때 커넥트를 하지 않으면 아이템이 emit 되지 않음

옵저버블 선택하기 하나의 스트림에 하나의 서브스크라이버 -> 그냥 Observable | 이전에 emit된 아이템이 필요 없을 경우 -> Observable.publish().refCount() | 모든 emit된 아이템이 필요할 경우 -> Observable.replay().refCount() | 가장 최근에 emit된 아이템이 필요할 경우 -> Observable.compose(ReplayingShare.instance())

RxJava Test

테스트용 서브스크라이버를 사용하면 됨 TestSubscriber.create(); 로 옵저버블을 서브스크라이브 하고 getOnNextEvents()메소드로 아이템을 가져다가 assert 하면 되는듯.

블로킹으로 가져와서 할수도 있지만 별로 좋진 않음? -> 쓰레드 후커? 프록시를 써서 하면 할 수 있음

public class TestSchedulerProxy {

    private static final TestScheduler SCHEDULER = new TestScheduler();
    private static final TestSchedulerProxy INSTANCE = new TestSchedulerProxy();

    static {
        try {
            RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
                @Override
                public Scheduler getIOScheduler() {
                    return SCHEDULER;
                }

                @Override
                public Scheduler getComputationScheduler() {
                    return SCHEDULER;
                }

                @Override
                public Scheduler getNewThreadScheduler() {
                    return SCHEDULER;
                }
            });

            RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
                @Override
                public Scheduler getMainThreadScheduler() {
                    return SCHEDULER;
                }
            });
        } catch (IllegalStateException e) {
            throw new IllegalStateException("Schedulers class already initialized. " +
                    "Ensure you always use the TestSchedulerProxy in unit tests.");
        }
    }

    public static TestSchedulerProxy get() {
        return INSTANCE;
    }

    public void advanceBy(long delayTime, TimeUnit unit) {
        SCHEDULER.advanceTimeBy(delayTime, unit);
    }
}

이런식으로 후커를 만들고, 테스트 케이스안에서 스케줄러에 대해 아래 메소드를 호출하면 시간을 앞당인다????고 해야하나...?????

proxy.advanceBy(1, TimeUnit.SECONDS);

3일차

creating observable

되도록 Observable.create 는 쓰지 말도록-> OnSubscribe imple 안에서 onNext, onError, onComplete 를 매뉴얼하게 호출해줘야 하는데 실수하면 메모리릭 가능성

그래서 from 을 쓰거나 defer 같은걸 쓰는게 좋을듯?

Observable.empty() 자주 사용하시는듯 Observable.error(new NullPointerException("error")); 도 자주 사용 -> validation 체크할 때 Observable.timer(100, TimeUnit.MILLISECONDS, testScheduler); 도 자주 사용 -> unsubscribe 잘 해줘야함

테스트할 떄 subscriber = new TestSubscriber(); 를 사용하면 좋음 subscriber.assertValue("name"); 이렇게 assert 할 수 도 있음


subscribeOn 하나의 스트림에 대해서 한번만 호출 가능(여러번 호출해도 최초에 호출한 스케쥴러만 적용됨). 아이템이 emit되는 쓰레드를 설정.

observeOn 하나의 스트림에 대해 여러번 호출 가능. 호출한 뒤로 체이닝된 오퍼레이션의 쓰레드를 설정(참고로 오퍼레이션이 스케쥴러를 바꾸기도함).

subscribeOnsubscribe할때 호출 하는가? -> subscribeOn은 아이템의 emit과 관련된 스케줄러 설정인데 그렇다면 스트림 생성 후 바로 호출해줘야 맞는거 같음 -> 하지만 이렇게 되면 subscriber가 이미 설정된 스케줄러를 바꿀 수 없음. 그래서 최대한 뒤로 미뤄서 하는듯???

일반적인 observable(cold) 일 경우 subscribeOn, subscribe 할때마다 새로운 스트림이 생성되기 때문에 각기 다른 스케쥴러를 줄 수 있음


flatMap 하나의 아이템을 받아서 여러개의 아이템으로 주고 싶을 때. 하지만 순서가 보장되지 않음.

concatMap flatMap이랑 같은데 순서가 보장됨.

4일차

Subject in RxJava

4가지 서브젝트 타입

  • AsyncSubject
  • BehaviorSubject
  • PublishSubject
  • ReplaySubject

A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

http://reactivex.io/documentation/subject.html Subject = Observable + Subscriber Observable, Subscriber 연결하는 녀석??

  1. AsyncSubject

onComplete 바로 이전의(마지막) item emit

  1. BehaviorSubject

it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).

subscribe 시점 이후 가장 최근의 item + 나머지 item 기본값을 전달해주는 용도도 가능

  1. PublishSubject

기본적인 subject subscribe 이후에 emit 되는 모든 item 받음

  1. ReplaySubject

emit 된 모든 item 을 버퍼에 저장하고 subscribe 하는 모든 Observer 에게 전달 (순서 보장)

ReplaySubject.create(int capacity) : unbound. 미리 버퍼(배열) 할당 해주는 용(배열 카피 오버헤드 방지) 기본값 32k

ReplaySubject.createWithSize(int size) : bound됨. 최대 버퍼 크기를 지정

Why subject?

Observable 은 생성 시점에 데이터를 넣어줘야 함. 근데 Subject는 생성 후에 넣어줌

Serializing

When you use a Subject as a Subscriber, take care not to call its onNext( ) method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.

To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

5일차

CompositeSubscription

서브스크립션을 한곳에 모아서 관리할때 사용 -> 보통 안드 라이프싸이클이랑 엮어서 씀 -> RxLifeCycle을 이용해서 compose를 하면됨

RxUserBus.sub().compose(bindToLifecycle()).subscribe((String s) -> {
    Toast.makeText(this, s, Toast.LENGTH_SHORT).show();
});

자동으로 onPause, onResume 될때 처리해줌

WeakReference랑 엮어서 사용해보는건 별로 -> null로 나와서??

디버깅

1. rxjava-debug

static {
	String TAG = "Rx-Debug Example";
	RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
	    public Object onNext(DebugNotification n) {
	        Log.v(TAG, "onNext on " + n);
	        return super.onNext(n);
	    }
	
	
	    public Object start(DebugNotification n) {
	        Log.v(TAG, "start on " + n);
	        return super.start(n);
	    }
	
	
	    public void complete(Object context) {
	        Log.v(TAG, "complete on " + context);
	    }
	
	    public void error(Object context, Throwable e) {
	        Log.e(TAG, "error on " + context);
	    }
	}));
}

registerObservableExecutionHook 로 훅을 걸어줄 수 있음.

이것보다 frodo를 사용하면 좀 더 쉽게 annotation processor 를 이용해서 쉽게 사용 가능함

public interface GitHubService {
    @RxLogObservable(RxLogObservable.Scope.EVERYTHING)
    @GET("/search/users?")
    Observable<SearchResult> searchUsers(@Query("q") String query);
}

저렇게 어노테이션 붙이면 로깅을 할 수 있음....

3. IntelliJ 기본 기능 활용; Break point를 사용해서 로그 찍기

attach debugger to android process

Break point를 사용해서 로그 찍기

cmd + F8: break point 찍기

cmd + shift + F8: break point 옵션 다이얼로그 띄우기

suspend uncheck 하고

log evaluated expression 에 찍고자 하는 표현식 작성


MVCC: multiversion concurrency control

Realm 에서는 모든 인스턴스가 ThreadLocal

copyFromRealm: 렘의 managed 객체를 stand alone?(unmanaged) 객체로 만들어 줌

retrofit의 RxJavaCallAdapterFactory

realm의 RealmObservableFactory

Observable.create() 통해서 생성할땐 반드시 isUnsubscribed() 체크를 해야하고, onNext, onError, onComplete를 잘 써야함

ex) 이미 unsubscribe 됬는데 onNext로 아이템을 emit 하는 경우 체크해야하고, onError호출시 onComplete는 호출되지 않아야 함, 등등....

@Override
public Observable<DynamicRealm> from(DynamicRealm realm) {
	final RealmConfiguration realmConfig = realm.getConfiguration();
	return Observable.create(new Observable.OnSubscribe<DynamicRealm>() {
	    @Override
	    public void call(final Subscriber<? super DynamicRealm> subscriber) {
	        // Get instance to make sure that the Realm is open for as long as the
	        // Observable is subscribed to it.
	        final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
	        final RealmChangeListener<DynamicRealm> listener = new RealmChangeListener<DynamicRealm>() {
	            @Override
	            public void onChange(DynamicRealm realm) {
	                if (!subscriber.isUnsubscribed()) {
	                    subscriber.onNext(observableRealm);
	                }
	            }
	        };
	        observableRealm.addChangeListener(listener);
	        subscriber.add(Subscriptions.create(new Action0() {
	            @Override
	            public void call() {
	                observableRealm.removeChangeListener(listener);
	                observableRealm.close();
	            }
	        }));
	
	            // Immediately call onNext with the current value, as due to Realm's auto-update, it will be the latest
	        // value.
	        subscriber.onNext(observableRealm);
	    }
	});
}

subscriber.add(Subscriptions.create(new Action0() 이 부분이 꼭 필요함??? -> ??

6일차

https://youtu.be/hHnTIMjd1Y8?t=15m14s

RxJava in Action

RxAndroid, RxBinding, ReRelay

RxBinding: ui 위젯의 버튼 클릭, 스크롤 이벤트를 모두 Observable로 제공

RxView.Click(view)

거의 모든곳에서 사용 ㄷㄷ

Operators

interval()

withLatestFrom(observable): 이벤트 발생시 다른 Observable의 마지막 아이템과 합쳐서 처리?

distinctUntilChanged(): 이벤트가 다를 경우에만 처리

startWith(sth): 구독시 첫 아이템으로 시작 할 수 있도록 해줌

onBackpressureDrop(): 보통은 UI 쓰레드에서 버퍼를 쓰는데, 드랍은 이벤트 처리가 지연될때 유용, timer로 뭔가 처리할때 처리가 지연되면 안될 때, 굳이 모든 아이템을 받을 필요가 없을 때

롤링 배너를 만든다. 1초마다 페이징 그런데 손으로 드래그할땐 무시해야함.

val timer = Observable.interval(1000, TimeUnit.MILLISECONDS)

val dragging = RxViewPager.pageScrollStateChanges(vp_activity_view_pager)
        .map { ViewPager.SCROLL_STATE_DRAGGING == it }
        .distinctUntilChanged()
        .startWith(false)
        
subscription.add(timer.withLatestFrom(dragging, { timer, dragging -> dragging })
        .filter { !it } // 드래그 중이 아닐때만 
        .retry()
        .onBackpressureDrop()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            with(vp_activity_view_pager) {
                val currentIdx = currentItem
                currentItem = if (currentIdx == adapter.count - 1) 0 else currentItem + 1
            }
        })

RxRelay - 안전한 이벤트 스트림?

onError, onComplete 발생시에도 스트림이 죽지 않음..

PublisRelay.create() 로 생성

어떤 조건이 되었을 때 뷰를 잠깐 보여주는 용도에서...이떄 반드시 backpressure써줘야

private val visibilityPublishRelay by lazy { PublishRelay.create<Int>() }

with(subscription) {
            add(visibilityPublishRelay
                    .retry()
                    .distinctUntilChanged()
                    .onBackpressureBuffer()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe {
                        changeVisibility(it)
                    })

            add(RxView.clicks(btn_activity_visibility_event)
                    .concatMap {
                        Observable.concat(
                                Observable.just(View.VISIBLE),
                                Observable.just(View.GONE).delay(3, TimeUnit.SECONDS))
                    }
                    .subscribe(visibilityPublishRelay))
        }

private fun changeVisibility(toVisibility: Int) {
        with(tv_activity_visibility_event) {
            if (toVisibility == visibility) {
                return
            }
            visibility = toVisibility
        }
    }

Kotlin

private val btnViewPager by lazy {
    findViewById(R.id.btn_activity_main_view_pager) as Button
}

private val btnViewPager by lazy {
    (findViewById(R.id.btn_activity_main_view_pager) as Button).apply {
    	textSize = 15f
    }
}

android extentions 을 쓰면,

private val btnViewPager by lazy {
    btn_activity_main_view_pager // 바로 xml 이름을
}

btnActivityMainViewPager.text = “”
btnActivityMainViewPager.setOnClockListener {
	~~~
}
class OperatorMapSubscriptionItemListing : Observable.Operator<Listing, SubscriptionItem> {

    override fun call(o: Subscriber<in Listing>): Subscriber<in SubscriptionItem> {
        return object: Subscriber<SubscriptionItem>() {

            override fun onError(e: Throwable?) {
                o.onError(e)
            }

            override fun onCompleted() {
                o.onCompleted()
            }

            override fun onNext(t: SubscriptionItem?) {
                t?.let {
                    o.onNext(Listing(it.title, it.channelName,
                            "gdg://korea.android/listing/${it.itemId}?channelName=${it.channelName}"))
                }
            }
        }
    }
}

observable.lift(OperatorMapSubscriptionItemListing)

Sequence Operators

The following example shows how you can use the lift( ) operator to chain your custom operator (in this example: myOperator) alongside standard RxJava operators like ofType and map:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new 
myOperator<T>()).map({"transformed by myOperator: " + it});

The following section shows how you form the scaffolding of your operator so that it will work correctly with lift( ).

Subject<T, R>

Observable이자 Observer

toSerialized로 여러 쓰레드서 접근해도 괜찮

RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);: onObservableStart를 호출하고 OnSubscribe객체 리턴

return RxJavahooks.onObservableReturn(subscriber): Subscription 객체 리턴

If using Observable.just(), consider Completable.create() or Completable.defer()

java

Completable.create(new Completable.OnSubscribe() {
	@Override
		public void call(CompletableSubscriber completableSubscriber) {

	}
}).observeOn(AndroidSchedulers.mainThread()).subscribe();

GDG Korea Slack

half-but [5:16 PM] 안녕하세요 혹시 observable의 defer연산자에 대한 설명해주실수 있으신가요? 잘 이해가 가지 않아서요… 도움주세요

seongug.jung (정승욱) [6:51 PM] subscribe 되기전까지 대기하는 함수에요 이를테면

api.getData1()
  .concatWith(api.getdata2())
  ...

위처럼 구성을 하면 getData1 과 getData2 가 거의 동시에 호출이 되요 그 과정에서 getData1 에서 에러가 나면 getData2 는 undelivered 에러가 발생하게 되요 설랑탕을 사왔는데 왜 먹지를 못하니 (edited) 이런 경우를 방지할 수 있는 방법으로 defer 인데 concatWith 나 mergeWith 같은 경우에 쓰면

api.getData1()
  .concatWith(Single.defer { api.getData2() })
  ...

이처럼 쓰면 getData1 이 처리가 끝나고 실제 concatWith 가 subscribe 가 되는 시점에 동작을 시작해요 이런 경우에도 쓸 수 있어요.

getLocal().concatWith(getApi())

이렇게 쓰면 로컬 보내고 최신 정보면 api 를 안 쏴도 되는 상황에서 문제가 될 수 있죠. 그럴때 defer 를 쓰면 로컬 보내고 갱신 안되도 됨으로 처리하면 api 를 안 쏘는 처리를 할 수 있어요 는 너무 늦게 대답한 느낌이네요

yjh5424 [7:53 PM] joined #reactivex along with geuntaek.

gaemi [11:19 AM] 최근에 .defer() 를 Hystrix 적용하면서 유용하게 사용했어요. ㅎㅎ 제 생각엔 .compose() 와 조합할 수 있는 최고의 함수가 아닐까 생각되네요.

gaemi [11:23 AM] 예를든다면 캐시를 사용하기 위하여 Single.Transformer 를 이런형태로 구성했어요.

   public Single<T> call(Single<T> source) {
       return Single.defer(() -> {
           T cachedObj = findCache();
           if (cachedObj != null) {
               return Single.just(cachedObj);
           }

           return source.doOnSuccess(v -> {
               Completable.fromAction(() -> putCache(v))
                       .subscribeOn(cacheScheduler)
                       .subscribe(Actions.empty(), Actions.empty());
           });
       });
   }
   
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment