Skip to content

Instantly share code, notes, and snippets.

@msemelman
Created September 26, 2016 14:19
Show Gist options
  • Save msemelman/d9d2b54ce0c01af8952bc298058dc0d5 to your computer and use it in GitHub Desktop.
Save msemelman/d9d2b54ce0c01af8952bc298058dc0d5 to your computer and use it in GitHub Desktop.
def launch(val jar:String, val jobConf:JobConf) = {
//Configuration
val sparkLauncher = new SparkLauncher()
.setAppResource(s"file://$jar")
.setDeployMode("cluster")
.setMaster(cassMaster)
.setConf(SparkLauncher.DRIVER_MEMORY, jobConf.driverMemory)
.setConf(SparkLauncher.EXECUTOR_CORES, jobConf.executorCores)
.setConf(SparkLauncher.EXECUTOR_MEMORY, jobConf.executorMemory)
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, jobConf.driverExtraJavaOptions)
.setConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, jobConf.executorExtraJavaOptions)
.setVerbose(true)
.setMainClass(jobConf.clazz)
.addAppArgs(appArgs: _*)
if (jobConf.logEnabled) {
sparkLauncher.setConf("spark.eventLog.enabled", "true")
sparkLauncher.setConf("spark.eventLog.dir", jobConf.logDir)
}
val listener: SparkAppListener = new SparkAppListener(jobConf)
sparkLauncher.startApplication(listener)
}
//Listener class​
class SparkAppListener(jobConf: JobConf) extends SparkAppHandle.Listener {
override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {
Logger.info(s"Job: ${jobConf.jobId} State: ${sparkAppHandle.getState} for app ${sparkAppHandle.getAppId}")
}
}
override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {
Logger.info(s"Job: ${jobConf.jobId} State: ${sparkAppHandle.getState} for app ${sparkAppHandle.getAppId}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment