Skip to content

Instantly share code, notes, and snippets.

@Normal
Created December 27, 2017 22:41
Show Gist options
  • Save Normal/aa8fb1facdb4c6bd2bc8bb9514ce33f8 to your computer and use it in GitHub Desktop.
Save Normal/aa8fb1facdb4c6bd2bc8bb9514ce33f8 to your computer and use it in GitHub Desktop.
RxScala multithreaded
import java.util.concurrent.{Executors, TimeUnit}
import rx.lang.scala.Observable
import scala.concurrent.duration._
//import scala.concurrent.ExecutionContext.Implicits.global
import rx.lang.scala.schedulers.{ComputationScheduler, ExecutionContextScheduler}
import scala.concurrent.{ExecutionContext, Future}
object RxScalaBasedApproach {
def main(args: Array[String]): Unit = {
val t0 = System.currentTimeMillis()
def queryData1 = () => {
println(s"thread - ${Thread.currentThread().getName}")
Array(6, 7, 8, 9, 10)
}
def queryData2 = () => {
println(s"thread - ${Thread.currentThread().getName}")
Array(1, 2, 3, 4, 5)
}
val service = Executors.newFixedThreadPool(4)
val ec = ExecutionContext.fromExecutor(service)
val s = ExecutionContextScheduler(ec)
val o1 = Observable.from(queryData1())
val o2 = Observable.from(queryData2())
o1.merge(o2)
.flatMap(i => Observable.just(i).subscribeOn(s))
.subscribe(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
val t1 = System.currentTimeMillis()
println(s"Elapsed time:${(t1 - t0) / 1000f}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment