Skip to content

Instantly share code, notes, and snippets.

@GrigorievNick
Last active January 12, 2023 17:30
Show Gist options
  • Save GrigorievNick/e7cf9ec5584b417d9719e2812722e6d3 to your computer and use it in GitHub Desktop.
Save GrigorievNick/e7cf9ec5584b417d9719e2812722e6d3 to your computer and use it in GitHub Desktop.
Spark Observation API Bug, reproduce
import org.apache.spark.sql.Observation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
class TestObservationWithForeach extends FunSuite {
test("observation") {
implicit val spark: SparkSession = SparkSession.builder()
.master("local")
.getOrCreate()
import spark.implicits._
spark.listenerManager.register(new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit =
println(qe)
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit =
println(qe)
})
val observation = Observation.apply("test")
spark
.range(200)
.repartition(5)
.observe(observation, count($"id"))
.foreach(println(_))
// .foreachPartition { it: Iterator[lang.Long] => it.foreach(println) }
// .count()
println(observation.get)
}
}
@GrigorievNick
Copy link
Author

uncomment count and test will pass, but with foreach and foreachPartition it will stuck.
Because these two actions do not trigger QueryExecutionListener .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment