Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2 to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition) - v2
import java.io.{File, PrintWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQueryListener
import spark.implicits._
class QueryListenerWriteProgressToStdout() extends StreamingQueryListener with Logging {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
logWarning(s"Query is started for ID ${event.id} and RUNID ${event.runId}")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
try {
logWarning(event.progress.json + "\n")
} catch {
case e: Exception =>
logError("Error write event[" + event.progress.json + "] to file", e)
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
logWarning(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}")
}
}
import org.apache.log4j.Logger
import org.apache.log4j.Level
spark.streams.addListener(new QueryListenerWriteProgressToStdout())
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "5")
val speedSchema = StructType(Seq(
StructField("eventTime", StringType, nullable = false),
StructField("eventSource", StringType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("driverId", IntegerType, nullable = false),
StructField("driverName", StringType, nullable = false),
StructField("routeId", IntegerType, nullable = false),
StructField("route", StringType, nullable = false),
StructField("speed", IntegerType, nullable = false)
))
// data distribution is skewed so had to set 0 to some partitions
// only uses 4 smallest partitions to let query finish earlier
// one fetch request seems to get 500 records per partition: setting this to 200 per partition
// (so that it can be continuously re-read from next batch)
val speedEventDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "truck_speed_events_stream_spark_25151_v1")
.option("startingOffsets", """{"truck_speed_events_stream_spark_25151_v1":{"0":5,"1":5,"2":-1,"3":-1,"4":5,"5":-1,"6":5,"7":-1,"8":-1,"9":0}}""")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", 800) // 200 * 4 partitions
.load()
.selectExpr("CAST(value AS STRING) as value")
.as[String]
.select(from_json($"value", schema = speedSchema).as("data"))
.selectExpr("data.eventTime AS eventTime", "data.eventSource AS eventSource", "data.truckId AS truckId", "data.driverId AS driverId", "data.driverName AS driverName", "data.routeId AS routeId", "data.route AS route", "data.speed AS speed")
val speedEventDf2 = speedEventDf
.selectExpr("eventTime AS eventTime2", "eventSource AS eventSource2", "truckId AS truckId2", "driverId AS driverId2", "driverName AS driverName2", "routeId AS routeId2", "route AS route2", "speed AS speed2")
val query = speedEventDf
.join(speedEventDf2, expr("truckId = truckId2 AND driverId = driverId2 AND routeId = routeId2"))
.select(to_json(struct($"*")).as("value"))
.writeStream
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => println(s"batchId $batchId received for writing, content: ${batchDF.collect()}")
}
.start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment