Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save HeartSaVioR/74c7e78e5901b1974ccc400502fb6af2 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/74c7e78e5901b1974ccc400502fb6af2 to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool
// run the code in spark-shell
// e.g.: ./bin/spark-shell --master "local[3]" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-SNAPSHOT
val branch = "master" // change this when changing version of "spark-sql-kafka"
val attempt = "1" // change this according to the attempt No.
// :paste
import java.io.{File, PrintWriter}
import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQueryListener
import spark.implicits._
class QueryListenerWriteProgressToFile(queryStatusFile: String) extends StreamingQueryListener {
val logger = LogFactory.getLog(classOf[QueryListenerWriteProgressToFile].getName)
val writer = new PrintWriter(new File(queryStatusFile))
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
try {
writer.write(event.progress.json + "\n")
writer.flush()
} catch {
case e: Exception =>
logger.error("Error write event[" + event.progress.json + "] to file", e)
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}")
writer.close()
}
}
val queryWritePath = s"/Users/jlim/experiment-SPARK-25151-${branch}-query-v${attempt}.log"
spark.streams.addListener(new QueryListenerWriteProgressToFile(queryWritePath))
val speedEventDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "truck_speed_events_stream")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", 5000)
.load()
val speedSchema = StructType(Seq(
StructField("eventTime", StringType, nullable = false),
StructField("eventSource", StringType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("driverId", IntegerType, nullable = false),
StructField("driverName", StringType, nullable = false),
StructField("routeId", IntegerType, nullable = false),
StructField("route", StringType, nullable = false),
StructField("speed", IntegerType, nullable = false)
))
val query = speedEventDf
.selectExpr("CAST(value AS STRING) as value")
.as[String]
.select(from_json($"value", schema = speedSchema).as("data"))
.selectExpr("data.*", "to_timestamp(data.eventTime, 'yyyy-MM-dd HH:mm:ss.SSS') AS eventTimestamp")
.select(to_json(struct($"*")).as("value"))
.writeStream
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => // no-op
}
.start()
query.awaitTermination()
// command to read query status and have simple statistic
// brew install jq && brew install datamash
// cat experiment-SPARK-25151-master-query-v1.log | grep "addBatch" | jq '. | {addBatch: .durationMs.addBatch}' | grep "addBatch" | awk -F " " '{print $2}' | datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment