Created
January 14, 2021 22:31
-
-
Save zlaval/cf57350a697c39e7a19fa71f312cfae2 to your computer and use it in GitHub Desktop.
Reactor Coroutine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data class Vehicle( | |
val name: String | |
) | |
data class Car( | |
val id: Long? = null, | |
val producer: String | |
) | |
data class DetailedCar( | |
val producer: String, | |
val type: String | |
) | |
object ReactorDatabase { | |
fun saveData(car: Car): Mono<Car> = Mono.just(Car(1, car.producer)) | |
} | |
object CoroutineDatabase { | |
suspend fun saveData(car: Car): Car = Car(1, car.producer) | |
} | |
object ReactiveWebService { | |
fun loadFromWeb(): Mono<String> = Mono.just("corolla") | |
.delayElement(Duration.ofMillis(100)) | |
} | |
object CoroutineWebService { | |
suspend fun loadFromWeb(): String { | |
delay(100) | |
return "MZ720" | |
} | |
} | |
object Mapper { | |
fun vehicleToCar(vehicle: Vehicle): Car = Car(producer = vehicle.name) | |
fun carToDetailedCar(car: Car, type: String) = DetailedCar(car.producer, type) | |
} | |
class CoroutineVsReactor { | |
private val stream = Flux.just(Vehicle("Toyota"), Vehicle("Ford")) | |
@Test | |
fun `reactor example`() { | |
val pipeline = stream.map { Mapper.vehicleToCar(it) } | |
.concatMap { ReactorDatabase.saveData(it) } | |
.zipWith(ReactiveWebService.loadFromWeb()) | |
.map { (car, type) -> Mapper.carToDetailedCar(car, type) } | |
StepVerifier.create(pipeline) | |
.expectSubscription() | |
.expectNextCount(2) | |
.verifyComplete() | |
} | |
@Test | |
fun `mixed example`() { | |
val pipeline = stream.concatMap { | |
mono { | |
val car = Mapper.vehicleToCar(it) | |
val savedCar = ReactorDatabase.saveData(car).awaitFirst() | |
val type = ReactiveWebService.loadFromWeb().awaitFirst() | |
Mapper.carToDetailedCar(savedCar, type) | |
} | |
} | |
StepVerifier.create(pipeline) | |
.expectSubscription() | |
.expectNextCount(2) | |
.verifyComplete() | |
} | |
@Test | |
fun `coroutine example`() { | |
val pipeline = | |
stream.concatMap { | |
mono { | |
val car = Mapper.vehicleToCar(it) | |
val savedCar = CoroutineDatabase.saveData(car) | |
val type = CoroutineWebService.loadFromWeb() | |
Mapper.carToDetailedCar(savedCar, type) | |
} | |
} | |
StepVerifier.create(pipeline) | |
.expectSubscription() | |
.expectNextCount(2) | |
.verifyComplete() | |
} | |
@Test | |
fun `coroutine parallel example`() { | |
val pipeline = stream.concatMap { | |
mono { | |
val car = Mapper.vehicleToCar(it) | |
val savedCarAsync = async { CoroutineDatabase.saveData(car) } | |
val typeAsync = async { CoroutineWebService.loadFromWeb() } | |
val type = typeAsync.await() | |
val savedCar = savedCarAsync.await() | |
Mapper.carToDetailedCar(savedCar, type) | |
} | |
} | |
StepVerifier.create(pipeline) | |
.expectSubscription() | |
.expectNextCount(2) | |
.verifyComplete() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment