Last active
June 27, 2018 07:49
-
-
Save HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3 to your computer and use it in GitHub Desktop.
Analysis streaming query progresses being ingested in Kafka topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// assuming we paste the code to `spark-shell` | |
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 | |
import org.apache.spark.sql.functions.{from_json, struct, to_json} | |
import org.apache.spark.sql.types._ | |
import spark.implicits._ | |
val df = spark | |
.read | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9020") | |
.option("subscribe", "app_query_progress") | |
.option("startingOffsets", "earliest") | |
.option("endingOffsets", "latest") | |
.load() | |
val schema = StructType(Seq( | |
StructField("id", StringType, nullable = false), | |
StructField("runId", StringType, nullable = false), | |
StructField("name", StringType, nullable = false), | |
StructField("timestamp", StringType, nullable = false), | |
StructField("batchId", LongType, nullable = false), | |
StructField("numInputRows", LongType, nullable = false), | |
StructField("inputRowsPerSecond", DoubleType, nullable = false), | |
StructField("processedRowsPerSecond", DoubleType, nullable = false), | |
StructField("eventTime", StructType(Seq( | |
StructField("avg", StringType, nullable = true), | |
StructField("max", StringType, nullable = true), | |
StructField("min", StringType, nullable = true), | |
StructField("watermark", StringType, nullable = true) | |
)), nullable = false), | |
StructField("durationMs", StructType(Seq( | |
StructField("addBatch", LongType, nullable = true), | |
StructField("getBatch", LongType, nullable = true), | |
StructField("getOffset", LongType, nullable = true), | |
StructField("queryPlanning", LongType, nullable = true), | |
StructField("triggerExecution", LongType, nullable = true), | |
StructField("walCommit", LongType, nullable = true) | |
)), nullable = false), | |
StructField("stateOperators", ArrayType(StructType(Seq( | |
StructField("numRowsTotal", LongType, nullable = false), | |
StructField("numRowsUpdated", LongType, nullable = false), | |
StructField("memoryUsedBytes", LongType, nullable = false), | |
// below field is only effective with SPARK-24441 : remove this field if Spark doesn't include SPARK-24441 | |
// not sure non-existing nullable fields are tolerable by from_json() | |
StructField("customMetrics", StructType(Seq( | |
StructField("providerLoadedMapSize", LongType, nullable = true))), nullable = true) | |
))), nullable = false), | |
StructField("sources", ArrayType(StructType(Seq( | |
StructField("description", StringType, nullable = false), | |
StructField("numInputRows", IntegerType, nullable = false), | |
StructField("inputRowsPerSecond", DoubleType, nullable = false), | |
StructField("processedRowsPerSecond", DoubleType, nullable = false), | |
// the format of "startOffset" and "endOffset" would be different between datasource types | |
// even for Kafka datasource, we should provide topic name(s) as well as partition number as field name(s) | |
// which is not flexible and cannot be used in practice... so we just try to load with StringType | |
StructField("startOffset", StringType, nullable = true), | |
StructField("endOffset", StringType, nullable = true) | |
))), nullable = false), | |
StructField("sinks", StructType(Seq( | |
StructField("description", StringType, nullable = false) | |
)), nullable = false) | |
)) | |
// parse json value and apply schema | |
val jsonDf = df | |
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp") | |
.as[(String, String, Long)] | |
.select(from_json($"value", schema = schema).as("data"), $"timestamp") | |
// extract max batch id per run id | |
val maxBatchIdDf = jsonDf | |
.selectExpr("data.runId", "data.batchId") | |
.groupBy("runId") | |
.max("batchId") | |
.select($"runId".as("runId_maxBatchId"), $"max(batchId)".as("maxBatchId")) | |
maxBatchIdDf.show() | |
// note that multiple progresses can be logged with same batch id | |
// picking needed columns and applying distict() might help on deduplicating progresses | |
val source0InfoPopulatedDf = jsonDf | |
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[0].numInputRows AS source_0_numInputRows", | |
"data.sources[0].inputRowsPerSecond AS source_0_inputRowsPerSecond", | |
"data.sources[0].processedRowsPerSecond AS source_0_processedRowsPerSecond", | |
"data.sources[0].startOffset AS source_0_startOffset", | |
"data.sources[0].endOffset AS source_0_endOffset") | |
.distinct() | |
source0InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show() | |
val source1InfoPopulatedDf = jsonDf | |
.selectExpr("data.id", "data.runId", "data.batchId", "data.sources[1].numInputRows AS source_1_numInputRows", | |
"data.sources[1].inputRowsPerSecond AS source_1_inputRowsPerSecond", | |
"data.sources[1].processedRowsPerSecond AS source_1_processedRowsPerSecond", | |
"data.sources[1].startOffset AS source_1_startOffset", | |
"data.sources[1].endOffset AS source_1_endOffset") | |
.distinct() | |
source1InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show() | |
val durationInfoPopulatedDf = jsonDf | |
.selectExpr("data.id", "data.runId", "data.batchId", "data.durationMs.*") | |
.distinct() | |
durationInfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show() | |
val eventTimeInfoPopulatedDf = jsonDf | |
.selectExpr("data.id", "data.runId", "data.batchId", "data.eventTime.avg AS eventTime_avg", | |
"data.eventTime.max AS eventTime_max", "data.eventTime.min AS eventTime_min", | |
"data.eventTime.watermark AS eventTime_watermark") | |
.distinct() | |
eventTimeInfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show() | |
val state0InfoPopulatedDf = jsonDf | |
.selectExpr("data.id", "data.runId", "data.batchId", | |
"data.stateOperators[0].numRowsTotal AS state_op_0_numRowsTotal", | |
"data.stateOperators[0].numRowsUpdated AS state_op_0_numRowsUpdated", | |
"data.stateOperators[0].memoryUsedBytes AS state_op_0_memoryUsedBytes", | |
"data.stateOperators[0].customMetrics.providerLoadedMapSize AS state_op_0_providerLoadedMapSize", | |
"(1.0 * data.stateOperators[0].customMetrics.providerLoadedMapSize / data.stateOperators[0].memoryUsedBytes) AS state_op_0_stateExcessLoadingOverheadPercentage") | |
.distinct() | |
state0InfoPopulatedDf.join(maxBatchIdDf, expr("runId = runId_maxBatchId AND batchId = maxBatchId")).show() | |
val maxStateExcessOverheadDf = state0InfoPopulatedDf | |
.groupBy("runId") | |
// effectively works like argmax | |
.agg(max(struct($"state_op_0_stateExcessLoadingOverheadPercentage", $"*")).as("data")) | |
.select($"data.*") | |
maxStateExcessOverheadDf.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment