Skip to content

Instantly share code, notes, and snippets.

@abelyansky
Created June 3, 2019 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abelyansky/904bf69852ff33909e08faefe249399e to your computer and use it in GitHub Desktop.
Save abelyansky/904bf69852ff33909e08faefe249399e to your computer and use it in GitHub Desktop.
simulating thread behavior in spark
import scala.concurrent.{Future,Await, blocking}
import scala.util.{Failure, Success}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val WORKERS_WAIT_ON_THREADS = true
val (javaThreadCounter, scalaThreadCounter) = (sc.longAccumulator, sc.longAccumulator)
val df = sc.parallelize(1 to 10).toDF()
df.foreachPartition(dataIter => {
val thread = new Thread {
override def run {
Thread.sleep(2000)
javaThreadCounter.add(1)
println(s"in java thread")
}
}
thread.start
val f = Future { blocking {
Thread.sleep(2000)
scalaThreadCounter.add(1)
println(s"in scala thread")
}}
// launch future async
if (WORKERS_WAIT_ON_THREADS) {
f.onComplete {
case Success(value) => println(s"partition processing succeeded")
case Failure(e) => println(s"partition processing failed")
}
} else {
// launch future sync
Await.result(f, 3000 millisecond)
}
println("in partition")
})
df.collect
if (WORKERS_WAIT_ON_THREADS) {
// wait for launched threads to update counters
Thread.sleep(3000)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment