Skip to content

Instantly share code, notes, and snippets.

@feresr
Last active Aug 30, 2021
Embed
What would you like to do?
package com.glovoapp.notification.in_app_notification
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
data class Image(val tag: String, val needsPostProcess: Boolean)
data class Post(val id: Int)
class NetworkRx() {
fun getPosts(): Single<List<Post>> {
return Single.just(emptyList())
}
fun getImages(postId: Int): Single<List<Image>> {
return Single.just(emptyList())
}
}
class NetworkCoroutine() {
suspend fun getPosts(): List<Post> {
delay((500..3000).random().toLong())
val postCount = (1..10).random()
val posts = mutableListOf<Post>()
repeat(postCount) {
posts.add(Post(it))
}
return posts
}
suspend fun getImages(postId: Int): List<Image> {
val images = mutableListOf<Image>()
val imageCount = (1..6).random()
repeat(imageCount) {
images.add(Image("$it", true))
}
delay(imageCount * 200.toLong())
return images
}
}
fun postProcessRx(image: Image): Single<Image> {
return Single.just(Image("", true))
}
suspend fun postProcess(image: Image): Image {
return Image("", true)
}
data class ProcessedPost(val post: Post, val images: List<Image>)
val networkRx: NetworkRx = NetworkRx()
val network: NetworkCoroutine = NetworkCoroutine()
private fun processRx(): Single<List<ProcessedPost>> {
// main thread
return networkRx
.getPosts()
.flattenAsObservable { it }
.concatMapEager { post -> // concat: keep the post order, eager: parallelism
networkRx
.getImages(post.id)
.flattenAsObservable { it }
.observeOn(Schedulers.computation())
.concatMapEager { image ->
if (image.needsPostProcess) {
postProcessRx(image).toObservable()
} else {
Observable.just(image)
}
}
.toList()
.observeOn(AndroidSchedulers.mainThread())
.map { images -> ProcessedPost(post, images.sortedBy { !it.needsPostProcess }) }
.toObservable()
}
.toList()
}
fun main() = runBlocking {
processCoroutine().forEach {
println(it.post.id)
}
}
suspend fun processCoroutine(): List<ProcessedPost> {
val posts = network.getPosts()
return coroutineScope {
posts.map { post ->
async {
val postImages = network.getImages(post.id)
val processedImages = postImages.map {
async(Dispatchers.Default) { if (it.needsPostProcess) postProcess(it) else it }
}
ProcessedPost(post, processedImages.awaitAll().sortedBy { !it.needsPostProcess })
}
}.awaitAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment