Skip to content

Instantly share code, notes, and snippets.

@ezhulenev
Created August 11, 2015 22:16
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ezhulenev/c7a3e549d9195a08d3d9 to your computer and use it in GitHub Desktop.
Save ezhulenev/c7a3e549d9195a08d3d9 to your computer and use it in GitHub Desktop.
Thread-safe Spark Sql Context
object ServerSparkContext {
private[this] lazy val _sqlContext = {
val conf = new SparkConf()
.setAppName("....")
val sc = new SparkContext(conf)
// TODO: Bug in Spark: http://stackoverflow.com/questions/30323212
val ctx = new HiveContext(sc)
ctx.setConf("spark.sql.hive.convertMetastoreParquet", "false")
ctx
}
private[this] lazy val sparkPool = {
val threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("spark-pool-%s")
.build()
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(threadFactory))
}
// TODO: Because of some Spark concurrency weirdness DataFrame can be created
// TODO: from sql query only in the same thread where SqlContext was initialized
// Dirty hack to initialize Sql Context and load data frames in the same thread
def sqlContext: SQLContext = {
val future = Future(_sqlContext)(sparkPool)
Await.result(future, 60.seconds)
}
def dataFrame(source: String, cached: Boolean = true): DataFrame = {
val future = Future {
val df = Source(source).asDataFrame(_sqlContext)
if (cached) df.cache() else df
}(sparkPool)
Await.result(future, 10.seconds)
}
}
@meniluca
Copy link

Very nice example!

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment