Created
June 3, 2019 18:56
-
-
Save abelyansky/904bf69852ff33909e08faefe249399e to your computer and use it in GitHub Desktop.
simulating thread behavior in spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.{Future,Await, blocking} | |
import scala.util.{Failure, Success} | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
val WORKERS_WAIT_ON_THREADS = true | |
val (javaThreadCounter, scalaThreadCounter) = (sc.longAccumulator, sc.longAccumulator) | |
val df = sc.parallelize(1 to 10).toDF() | |
df.foreachPartition(dataIter => { | |
val thread = new Thread { | |
override def run { | |
Thread.sleep(2000) | |
javaThreadCounter.add(1) | |
println(s"in java thread") | |
} | |
} | |
thread.start | |
val f = Future { blocking { | |
Thread.sleep(2000) | |
scalaThreadCounter.add(1) | |
println(s"in scala thread") | |
}} | |
// launch future async | |
if (WORKERS_WAIT_ON_THREADS) { | |
f.onComplete { | |
case Success(value) => println(s"partition processing succeeded") | |
case Failure(e) => println(s"partition processing failed") | |
} | |
} else { | |
// launch future sync | |
Await.result(f, 3000 millisecond) | |
} | |
println("in partition") | |
}) | |
df.collect | |
if (WORKERS_WAIT_ON_THREADS) { | |
// wait for launched threads to update counters | |
Thread.sleep(3000) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment