Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save HeartSaVioR/bf14fa6040b0f85e6ff1ee613aa7976a to your computer and use it in GitHub Desktop.
Save HeartSaVioR/bf14fa6040b0f85e6ff1ee613aa7976a to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition)
import java.io.{File, PrintWriter}
import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQueryListener
import spark.implicits._
class QueryListenerWriteProgressToFile(queryStatusFile: String) extends StreamingQueryListener {
val logger = LogFactory.getLog(classOf[QueryListenerWriteProgressToFile].getName)
val writer = new PrintWriter(new File(queryStatusFile))
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
try {
writer.write(event.progress.json + "\n")
writer.flush()
} catch {
case e: Exception =>
logger.error("Error write event[" + event.progress.json + "] to file", e)
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}")
writer.close()
}
}
val queryWritePath = s"/Users/jlim/experiment-SPARK-25151/experiment-SPARK-25151-${branch}-query-selfjoin-concurrent-access-v${attempt}.log"
spark.streams.addListener(new QueryListenerWriteProgressToFile(queryWritePath))
val speedSchema = StructType(Seq(
StructField("eventTime", StringType, nullable = false),
StructField("eventSource", StringType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("driverId", IntegerType, nullable = false),
StructField("driverName", StringType, nullable = false),
StructField("routeId", IntegerType, nullable = false),
StructField("route", StringType, nullable = false),
StructField("speed", IntegerType, nullable = false)
))
// data distribution is skewed so had to set 0 to some partitions
val speedEventDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "truck_speed_events_stream")
.option("startingOffsets", """{"truck_speed_events_stream":{"0":5,"1":5,"2":5,"3":5,"4":5,"5":5,"6":5,"7":5,"8":5,"9":0,"10":0,"11":0,"12":5,"13":5,"14":0,"15":5,"16":0,"17":5,"18":5,"19":0}}""")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", 5000)
.load()
.selectExpr("CAST(value AS STRING) as value")
.as[String]
.select(from_json($"value", schema = speedSchema).as("data"))
.selectExpr("data.eventTime AS eventTime", "data.eventSource AS eventSource", "data.truckId AS truckId", "data.driverId AS driverId", "data.driverName AS driverName", "data.routeId AS routeId", "data.route AS route", "data.speed AS speed")
val speedEventDf2 = speedEventDf
.selectExpr("eventTime AS eventTime2", "eventSource AS eventSource2", "truckId AS truckId2", "driverId AS driverId2", "driverName AS driverName2", "routeId AS routeId2", "route AS route2", "speed AS speed2")
val query = speedEventDf
.join(speedEventDf2, expr("truckId = truckId2 AND driverId = driverId2 AND routeId = routeId2"))
.select(to_json(struct($"*")).as("value"))
.writeStream
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => // no-op
}
.start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment