Skip to content

Instantly share code, notes, and snippets.

@Nanamare
Last active February 6, 2021 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nanamare/4aead807b933acc3a12628cd6ec17c82 to your computer and use it in GitHub Desktop.
Save Nanamare/4aead807b933acc3a12628cd6ec17c82 to your computer and use it in GitHub Desktop.
ByeBye RxOperator
class RxOperatorTest {
private val compositeDisposable = CompositeDisposable()
private val testScheduler = TestScheduler()
@BeforeEach
fun init() {
RxJavaPlugins.setComputationSchedulerHandler { testScheduler }
}
@AfterEach
fun release() {
compositeDisposable.clear()
}
/**
* Single.just, SingleJust : Single 로 Wrapping 된 아이템을 한번 방출한다.
* 특정 아이템을 Single 타입으로 변경하여 체이닝을 이어갈 때 사용하면 유용하겠다.
* ex) flatMap 으로 벗겨진 타입을 다시 Single 로 감싸준다
*/
@Test
fun `Single just - emit one number`() {
Single.just(7).subscribe { seven -> assertThat(seven).isEqualTo(7) }
.addTo(compositeDisposable)
}
@Test
fun `SingleJust - emit one number`() {
SingleJust(15).subscribe { seven -> assertThat(seven).isEqualTo(15) }
.addTo(compositeDisposable)
}
@ParameterizedTest
@MethodSource(value = ["provideInteger"])
fun `Single just - emit multiple number & reduce`(numbers: Single<Array<Int>>, result: Int) {
numbers.subscribe { number ->
val actual = number.reduce { acc, i -> acc + i }
assertThat(actual).isEqualTo(result)
}.addTo(compositeDisposable)
}
/**
* Delay : 해당 시간 이후 아이템을 배출
* 특정 비즈니스 로직을 수행하기 위해 사용되는 경우가 많다.
* ex) 아이템을 보여주고, 500 ms 뒤에는 사라지게 만든다
*/
@Test
fun `Single delay`() {
val testObserver = SingleJust(listOf(2, 4))
.delay(500L, TimeUnit.MILLISECONDS).test()
testObserver.assertEmpty()
testScheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS)
testObserver.assertValue(listOf(2, 4))
}
/**
* AmbArray : 가장 먼저 관찰된 아이템만 배출하고, 이후 나머지는 모두 무시한다
* ex) Gps_provider, Network_provider 등을 사용하여 사용자의 좌표를 받아오는데, 가장 먼저 관측된 아이템만 사용한다
*/
@Test
fun `Single ambArray`() {
val item1 = SingleJust(listOf(1, 2))
.delay(500L, TimeUnit.MILLISECONDS)
val item2 = SingleJust.just(listOf(3, 4))
.delay(100L, TimeUnit.MILLISECONDS)
val item3 = SingleJust.just(listOf(5, 6))
.delay(900L, TimeUnit.MILLISECONDS)
val testObserver = Single.ambArray(item1, item2, item3).test()
testScheduler.advanceTimeBy(2L, TimeUnit.SECONDS)
testObserver.assertValue(listOf(3, 4))
}
/**
* Timer : 일정시간 이후 1회 배출 되며 이때, Long 타입 0을 반환합니다
* A 이후 정해진 시간 이후 B 가 호출되어야하는 상황에 트리거의 역할을 할 수 있습니다.
*/
@Test
fun `Single timer`() {
val testObserver = Single.timer(500L, TimeUnit.MILLISECONDS).test()
testScheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
assertThat(testObserver.values()).isEqualTo(listOf(0L))
}
/**
* Map : 전달 받은 데이터를 새로운 형태로 변형시켜준다
* ex) 특정 자료형에 공통된 변경을 적용할 때 매우 많이 사용됨
*/
@Test
fun `Single map - one item`() {
Single.just("Lucas").map { "$it.shin" }.subscribe { it ->
assertThat(it).isEqualTo("Lucas.shin")
}
}
@ParameterizedTest
@MethodSource("provideMapMock")
fun `Single map - multiple item`(list: Single<List<Int>>, expected: List<Int>) {
list.map { ints ->
ints.map { int ->
int * 11
}
}.subscribe { ints ->
assertThat(ints).isEqualTo(expected)
}
}
/**
* FlatMap : Map 과 마찬가지로 Single<R> 을 리턴하지만, 파리머터에 SingleSource 타입의 자료형이 반환되어야 합니다.
*/
@ParameterizedTest
@MethodSource("provideFlatMapMock")
fun `Single flatMap`(list: Single<List<Int>>, expected: List<Int>) {
// Function<? super T, ? extends SingleSource<? extends R>> mapper
val flatMapList = list.flatMap { ints ->
val newFlatMapList = ints.map { int -> int.toString() }
Single.just(newFlatMapList)
}.blockingGet()
// Function<? super T, ? extends R> mapper
val mapList = list.map { ints ->
ints.map { int -> int.toString() }
}.blockingGet()
assertThat(flatMapList).isEqualTo(mapList)
}
/**
* Concat : 아이템들을 모아서 호출된 순서에 맞게 방출합니다
* Rx Operator 에서 concat 이라는 키워드가 들어가있으면 순서를 보장해준다고 생각하면 편합니다
* 아래 예제에서 listOf(3, 4) 의 배출이 더 빠를 것 같지만, concat 메서드의 파리머터로 item1 이 먼저 호출되었기 때문에
* 500L 이후에 list(1, 2) 이 배출되고 이후에 100L 뒤에 list(3, 4) 가 호출됩니다. 따라서 앞의 메서드에서 오버헤드가 큰
* 일을 하게되면 성능에 문제가 될 수 있습니다
* ex)처리 순서가 중요할 때 사용이 용이합니다
*/
@Test
fun `Single concat`() {
val item1 = SingleJust(listOf(1, 2)).delay(500L, TimeUnit.MILLISECONDS)
val item2 = SingleJust(listOf(3, 4)).delay(100L, TimeUnit.MILLISECONDS)
val item3 = SingleJust(listOf(5, 6)).delay(900L, TimeUnit.MILLISECONDS)
val testObserver = Single.concat(item1, item2, item3).test()
testObserver.assertEmpty()
testScheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS)
testObserver.assertValue(listOf(1, 2))
testScheduler.advanceTimeBy(100L, TimeUnit.MILLISECONDS)
testObserver.assertValues(listOf(1, 2), listOf(3, 4))
testScheduler.advanceTimeBy(900L, TimeUnit.MILLISECONDS)
testObserver.assertValues(listOf(1, 2), listOf(3, 4), listOf(5, 6))
}
/**
* FromCallable, FromObservable, FromPublisher, FromFuture : 각 from prefix 를 제외한 타입을 파라미터로 받는다
* 비동기 처리 이후 실행 결과를 리턴할 수 있다. 비동기 처리 이후 결과를 받아 또 다른 로직을 실행해야 하는 경우에 유용하다
* ex) 내부 DB 에서 로직 수행 후 이후 결과를 받아, 다른 처리에 사용
*/
@Test
fun `Single - fromXXXXX`() {
fun emitList() = listOf(1, 2, 3, 4, 5)
// fromCallable
val fromCallable1 = Single.fromCallable(Callable(::emitList)).blockingGet()
val fromCallable2 = Single.fromCallable { emitList() }.blockingGet()
assertThat(fromCallable1).isEqualTo(fromCallable2)
// fromObservable
val fromObservable = Single.fromObservable(Observable.just(listOf(1, 2, 3, 4, 5))).test()
testScheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
fromObservable.assertValue(listOf(1, 2, 3, 4, 5))
// fromPublisher
// Processor backPressure 지원하는 Subject
// Subject 에는 PublishSubject, BehaviorSubject(latest), ReplaySubject(cache),
// UnicastSubject(emit after subscribe) 등이 있고 각각 특징이 있어서 상황에 맞게 잘 골라 사용해야한다
val fromPublisher = Single.fromPublisher(PublishProcessor.just(100)).test()
fromPublisher.assertValue(100)
// fromFuture
val futureTask = Executors.newSingleThreadExecutor().submit<List<Int>> { listOf(1, 2, 3) }
val fromFuture = Single.fromFuture(futureTask).test()
fromFuture.assertValue(listOf(1, 2, 3))
}
/**
* Flowable : Single 이 하나의 아이템을 배출하면 Flowable 은 여러개 배출가능(그렇게 때문에 backPresure policy 존재)
* Interval : 정해진 간격으로 아이템 배출
* Filter : 작성한 조건의 결과가 true 결과면 아이템은 배출합니다
* Take : 원하는 횟수만큼 아이템을 배출한다. (만약 배출하려는 타겟의 크기가 횟수보다 작게되면 모두 방출하고 완료(onComplete) 호출)
*/
@Test
fun `Flowable - interval & filter`() {
val testObserver = Flowable.interval(1L, TimeUnit.SECONDS)
.filter { int -> int.toInt() % 2 == 0 }.take(10).test()
testScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
testObserver.assertValues(0, 2, 4, 6, 8)
}
/**
* Distinct : 중복되는 아이템을 제거해준다 (내부적으로 createHashSet 로 관리됨)
*/
@Test
fun `Flowable - distinct`() {
val testObserver = Flowable.just(0, 1, 0, 1, 2, 2)
.distinct().test()
testScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
testObserver.assertValues(0, 1, 2)
}
/**
* Skip : 지정한 횟수만큼 아이템 배출을 생략한다
*/
@Test
fun `Flowable - interval & skip`() {
val testObserver = Flowable.interval(1L, TimeUnit.SECONDS)
.take(10)
.skip(2)
.test()
testScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
// Skip 0, 1
testObserver.assertValues(2, 3, 4, 5, 6, 7, 8, 9)
}
@Test
fun `Single - zip`() {
val item1 = Single.just("Nanamare")
val item2 = Single.just("Kinamare")
Single.zip(item1, item2, { item1, item2 ->
val finalItem = "$item1 is $item2"
assertThat(finalItem).isEqualTo("Nanamare is Kinamare")
})
}
/**
* Sample : 흐름 제어 Operator 로 배출되는 아이템에서 정해진 간격에서 가장 최근 값들을 관찰한다
*/
@Test
fun `Flowable - sample`() {
val testSubscriber = Flowable.interval(50L, TimeUnit.MILLISECONDS)
.sample(100L, TimeUnit.MILLISECONDS, false)
.take(5)
.test()
testScheduler.advanceTimeBy(10L, TimeUnit.SECONDS)
testSubscriber.assertValues(0, 2, 4, 6, 8)
}
/**
* Debounce : 흐름제어 Operator 로 이벤트들을 그룹핑하여, 특정 주기의 마지막 값만 배출한다
* ex) 관련 검색 기능을 사용할 때 유용하다. 가령 Hello 를 입력하면
* H -> API 호출 -> e -> API 호출 -> l -> API 호출... 순으로 될 수 있지만,
* H -> e -> l -> l -> O -> API 호출로 처리할 수 있다.
*/
@Test
fun `Flowable - Debounce`() {
val testObserver = Flowable.concat(
Flowable.timer(100L, TimeUnit.MILLISECONDS).map { 1 },
Flowable.timer(100L, TimeUnit.MILLISECONDS).map { 2 },
Flowable.timer(100L, TimeUnit.MILLISECONDS).map { 3 },
Flowable.timer(100L, TimeUnit.MILLISECONDS).map { 4 },
).debounce(300L, TimeUnit.MILLISECONDS).test()
testScheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
testObserver.assertValue(4)
}
companion object {
@JvmStatic
fun provideInteger(): Stream<Arguments> {
return Stream.of(
Arguments.of(Single.just(arrayOf(1)), 1),
Arguments.of(Single.just(arrayOf(1, 2, 3, 4, 5)), 15)
)
}
@JvmStatic
fun provideMapMock(): Stream<Arguments> {
return Stream.of(
Arguments.of(Single.just(listOf(1, 2, 3, 4, 5)), listOf(11, 22, 33, 44, 55)),
Arguments.of(Single.just(listOf(5, 6, 7, 8, 9)), listOf(55, 66, 77, 88, 99)))
}
@JvmStatic
fun provideFlatMapMock(): Stream<Arguments> {
return Stream.of(
Arguments.of(Single.just(listOf(1, 2, 3, 4, 5)), listOf("1", "2", "3", "4", "5")),
Arguments.of(Single.just(listOf(5, 6, 7, 8, 9)), listOf("5", "6", "7", "8", "9")))
}
private val TAG = RxOperatorTest::class.java.simpleName
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment