Skip to content

Instantly share code, notes, and snippets.

@santiagopoli
Last active June 14, 2018 00:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save santiagopoli/86aa0477d5a681ef9727eda8fafa8253 to your computer and use it in GitHub Desktop.
Save santiagopoli/86aa0477d5a681ef9727eda8fafa8253 to your computer and use it in GitHub Desktop.
SubsribeOnTest
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.vertx.reactivex.core.RxHelper
import io.vertx.reactivex.core.Vertx
import java.util.concurrent.TimeUnit
object Main {
val vertx = Vertx.vertx()
val blockingScheduler by lazy { vertx.let(RxHelper::blockingScheduler) }
val nonBlockingScheduler by lazy { vertx.let(RxHelper::scheduler) }
val expensiveComputation = { it: ObservableEmitter<Int> ->
printCurrentThread("expensiveComputation")
var number = 0
while(!it.isDisposed) {
TimeUnit.SECONDS.sleep(1)
it.onNext(number++)
}
}
val printCurrentThread = { operationName : String ->
println("[$operationName] " + Thread.currentThread().name)
}
@JvmStatic
fun main(args: Array<String>) {
Observable.create(expensiveComputation)
.observeOn(nonBlockingScheduler)
.filter { it % 2 == 0 }
.doOnNext { printCurrentThread("onNext afterFilter") }
.take(3)
.doOnComplete { println("Done") }
.subscribeOn(blockingScheduler)
.subscribe { println("Received Number: " + it) }
}
}
@santiagopoli
Copy link
Author

santiagopoli commented Jun 14, 2018

Output:

[expensiveComputation] vert.x-worker-thread-0
[onNext afterFilter] vert.x-eventloop-thread-0
Received Number: 0
[onNext afterFilter] vert.x-eventloop-thread-0
Received Number: 2
[onNext afterFilter] vert.x-eventloop-thread-0
Received Number: 4
Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment