Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Proof-of-concept code of how to extend Spark listeners for custom monitoring of Spark metrics
// Proof-of-concept code of how to extend Spark listeners for custom monitoring of Spark metrics
// When using this from the spark-shell, use the REPL command :paste and copy-paste the following code
// Tested on Spark 2.1.0, March 2017
import org.apache.spark.scheduler._
import org.apache.log4j.LogManager
val logger = LogManager.getLogger("CustomListener")
class CustomListener extends SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
logger.warn(s"Stage completed, runTime: ${stageCompleted.stageInfo.taskMetrics.executorRunTime}, " +
s"cpuTime: ${stageCompleted.stageInfo.taskMetrics.executorCpuTime}")
}
}
val myListener=new CustomListener
//sc is the active Spark Context
sc.addSparkListener(myListener)
// run a simple Spark job and note the additional warning messages emitted by the CustomLister with
// Spark execution metrics, for exmaple run
spark.time(sql("select count(*) from range(1e4) cross join range(1e4)").show)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment