Skip to content

Instantly share code, notes, and snippets.

@soonraah
Created June 14, 2021 01:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soonraah/46546616199294d6e1659e5659c3a949 to your computer and use it in GitHub Desktop.
Save soonraah/46546616199294d6e1659e5659c3a949 to your computer and use it in GitHub Desktop.
A sample code for QueryExectionListener from Spark 3.0.0
// Register listener
spark
.listenerManager
.register(new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val num = qe.observedMetrics
.get("my_metrics")
.map(_.getAs[Long]("num"))
.getOrElse(-100.0)
println(s"num of data: $num")
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
})
// Make DataFrame
val df = Seq
.range(0, 1000)
.map((_, Seq("a", "b", "c")(Random.nextInt(3)), math.random()))
.toDF("id", "type", "value")
// Observe and process
val dfResult = df
.observe("my_metrics", count($"*").as("num"))
.groupBy($"type")
.agg(avg($"value").as("avg_value"))
// Run
dfResult.show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment