Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
simulating thread behavior in spark
import scala.concurrent.{Future,Await, blocking}
import scala.util.{Failure, Success}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val WORKERS_WAIT_ON_THREADS = true
val (javaThreadCounter, scalaThreadCounter) = (sc.longAccumulator, sc.longAccumulator)
val df = sc.parallelize(1 to 10).toDF()
df.foreachPartition(dataIter => {
val thread = new Thread {
override def run {
Thread.sleep(2000)
javaThreadCounter.add(1)
println(s"in java thread")
}
}
thread.start
val f = Future { blocking {
Thread.sleep(2000)
scalaThreadCounter.add(1)
println(s"in scala thread")
}}
// launch future async
if (WORKERS_WAIT_ON_THREADS) {
f.onComplete {
case Success(value) => println(s"partition processing succeeded")
case Failure(e) => println(s"partition processing failed")
}
} else {
// launch future sync
Await.result(f, 3000 millisecond)
}
println("in partition")
})
df.collect
if (WORKERS_WAIT_ON_THREADS) {
// wait for launched threads to update counters
Thread.sleep(3000)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.