Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Last active June 27, 2018 07:49
Show Gist options
  • Save HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3 to your computer and use it in GitHub Desktop.
Analysis streaming query progresses being ingested in Kafka topic
// assuming we paste the code to `spark-shell`
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import spark.implicits._
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9020")
.option("subscribe", "app_query_progress")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
val schema = StructType(Seq(
StructField("id", StringType, nullable = false),
StructField("runId", StringType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("timestamp", StringType, nullable = false),
StructField("batchId", LongType, nullable = false),
StructField("numInputRows", LongType, nullable = false),
StructField("inputRowsPerSecond", DoubleType, nullable = false),
StructField("processedRowsPerSecond", DoubleType, nullable = false),
StructField("eventTime", StructType(Seq(
StructField("avg", StringType, nullable = true),
StructField("max", StringType, nullable = true),
StructField("min", StringType, nullable = true),
StructField("watermark", StringType, nullable = true)
)), nullable = false),
StructField("durationMs", StructType(Seq(
StructField("addBatch", LongType, nullable = true),
StructField("getBatch", LongType, nullable = true),
StructField("getOffset", LongType, nullable = true),
StructField("queryPlanning", LongType, nullable = true),
StructField("triggerExecution", LongType, nullable = true),
StructField("walCommit", LongType, nullable = true)
)), nullable = false),
StructField("stateOperators", ArrayType(StructType(Seq(
StructField("numRowsTotal", LongType, nullable = false),
StructField("numRowsUpdated", LongType, nullable = false),
StructField("memoryUsedBytes", LongType, nullable = false),
// below field is only effective with SPARK-24441 : remove this field if Spark doesn't include SPARK-24441
// not sure non-existing nullable fields are tolerable by from_json()
StructField("customMetrics", StructType(Seq(
StructField("providerLoadedMapSize", LongType, nullable = true))), nullable = true)
))), nullable = false),
StructField("sources", ArrayType(StructType(Seq(
StructField("description", StringType, nullable = false),
StructField("numInputRows", IntegerType, nullable = false),
StructField("inputRowsPerSecond", DoubleType, nullable = false),
StructField("processedRowsPerSecond", DoubleType, nullable = false),
// the format of "startOffset" and "endOffset" would be different between datasource types
// even for Kafka datasource, we should provide topic name(s) as well as partition number as field name(s)
// which is not flexible and cannot be used in practice... so we just try to load with StringType
StructField("startOffset", StringType, nullable = true),
StructField("endOffset", StringType, nullable = true)
))), nullable = false),
StructField("sinks", StructType(Seq(
StructField("description", StringType, nullable = false)
)), nullable = false)
))
// parse json value and apply schema
val jsonDf = df
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp")
.as[(String, String, Long)]
.select(from_json($"value", schema = schema).as("data"), $"timestamp")
// extract max batch id per run id
val maxBatchIdDf = jsonDf
.selectExpr("data.runId", "data.batchId")
.groupBy("runId")
.max("batchId")
.select($"runId".as("runId_maxBatchId"), $"max(batchId)".as("maxBatchId"))
maxBatchIdDf.show()
// note that multiple progresses can be logged with same batch id
// picking needed columns and applying distict() might help on deduplicating progresses
val source0InfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS source_0_numInputRows",
"data.sources[0].inputRowsPerSecond AS source_0_inputRowsPerSecond",
"data.sources[0].processedRowsPerSecond AS source_0_processedRowsPerSecond",
"data.sources[0].startOffset AS source_0_startOffset",
"data.sources[0].endOffset AS source_0_endOffset")
.distinct()
source0InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show()
val source1InfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[1].numInputRows AS source_1_numInputRows",
"data.sources[1].inputRowsPerSecond AS source_1_inputRowsPerSecond",
"data.sources[1].processedRowsPerSecond AS source_1_processedRowsPerSecond",
"data.sources[1].startOffset AS source_1_startOffset",
"data.sources[1].endOffset AS source_1_endOffset")
.distinct()
source1InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show()
val durationInfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*")
.distinct()
durationInfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show()
val eventTimeInfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId", "data.eventTime.avg AS eventTime_avg",
"data.eventTime.max AS eventTime_max", "data.eventTime.min AS eventTime_min",
"data.eventTime.watermark AS eventTime_watermark")
.distinct()
eventTimeInfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show()
val state0InfoPopulatedDf = jsonDf
.selectExpr("data.id", "data.runId", "data.batchId",
"data.stateOperators[0].numRowsTotal AS state_op_0_numRowsTotal",
"data.stateOperators[0].numRowsUpdated AS state_op_0_numRowsUpdated",
"data.stateOperators[0].memoryUsedBytes AS state_op_0_memoryUsedBytes",
"data.stateOperators[0].customMetrics.providerLoadedMapSize AS state_op_0_providerLoadedMapSize",
"(1.0 * data.stateOperators[0].customMetrics.providerLoadedMapSize / data.stateOperators[0].memoryUsedBytes) AS state_op_0_stateExcessLoadingOverheadPercentage")
.distinct()
state0InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show()
val maxStateExcessOverheadDf = state0InfoPopulatedDf
.groupBy("runId")
// effectively works like argmax
.agg(max(struct($"state_op_0_stateExcessLoadingOverheadPercentage", $"*")).as("data"))
.select($"data.*")
maxStateExcessOverheadDf.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment