Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save HeartSaVioR/fa7febfae420dca36bdf78607af2e02a to your computer and use it in GitHub Desktop.
Save HeartSaVioR/fa7febfae420dca36bdf78607af2e02a to your computer and use it in GitHub Desktop.
Structured Streaming version of HDF IoT Trucking: StreamingAnalyticsTruckingRefApp
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger}
import org.apache.spark.sql.types._
import org.apache.commons.logging.LogFactory
object StreamingAnalyticsTruckingRefApp {
// this is simplified version of Streaming-Analytics-Trucking-Ref-App to make it compatible with
// Spark Structured Streaming
val logger = LogFactory.getLog(StreamingAnalyticsTruckingRefApp.getClass.getName)
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.expr
if (args.length < 2) {
println("params: [broker servers] [checkpoint location (HDFS)]")
sys.exit(1)
}
val brokerServers = args(0)
val checkpointLocation = args(1)
val ss = SparkSession
.builder()
.appName("StreamingAnalyticsTruckingRefApp")
//.master("local[*]")
.getOrCreate()
import ss.implicits._
val geoEventDf: DataFrame = getTruckGeoEventDf(ss, brokerServers)
val speedEventDf: DataFrame = getTruckSpeedEventDf(ss, brokerServers)
val joinedDf = geoEventDf.join(speedEventDf,
expr("""
data.driverId = data2.driverId
AND timestamp >= timestamp2
AND timestamp <= timestamp2 + interval 1 minute
"""), "inner")
val abnormalEventsDf = joinedDf
.filter("data.eventType <> 'Normal'")
.select($"data.driverId", $"data.driverName", $"data2.route", $"data2.speed")
val query = abnormalEventsDf
.select(to_json(struct($"driverId", $"driverName", $"route", $"speed")).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("topic", "streaming_analytics_trucking_app_result")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()
ss.streams.addListener(new QueryListenerSendingProgressToKafka(brokerServers))
query.awaitTermination()
}
class QueryListenerSendingProgressToKafka(brokerServers: String) extends StreamingQueryListener {
val props = new Properties
props.put("bootstrap.servers", brokerServers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val kafkaProducer = new KafkaProducer[String, String](props)
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
try {
val data = new ProducerRecord[String, String](
"streaming_analytics_trucking_app_query_progress",
event.progress.runId.toString, event.progress.json)
kafkaProducer.send(data)
} catch {
case e: Exception =>
logger.error("Error sending event[" + event.progress.json + "] to Kafka topic", e)
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}")
}
}
private def getTruckSpeedEventDf(ss: SparkSession, bootstrapServers: String) = {
import ss.implicits._
val speedEventDf = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "truck_speed_events_stream")
.option("startingOffsets", "latest")
.load()
val speedSchema = StructType(Seq(
StructField("eventTime", StringType, nullable = false),
StructField("eventSource", StringType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("driverId", IntegerType, nullable = false),
StructField("driverName", StringType, nullable = false),
StructField("routeId", IntegerType, nullable = false),
StructField("route", StringType, nullable = false),
StructField("speed", IntegerType, nullable = false)
))
speedEventDf
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp")
.as[(String, String, Long)]
.select(from_json($"value", schema = speedSchema).as("data2"), $"timestamp".as("timestamp2"))
}
private def getTruckGeoEventDf(ss: SparkSession, bootstrapServers: String) = {
import ss.implicits._
val geoEventDf = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "truck_events_stream")
.option("startingOffsets", "latest")
.load()
val geoSchema = StructType(Seq(
StructField("eventTime", StringType, nullable = false),
StructField("eventSource", StringType, nullable = false),
StructField("truckId", IntegerType, nullable = false),
StructField("driverId", IntegerType, nullable = false),
StructField("driverName", StringType, nullable = false),
StructField("routeId", IntegerType, nullable = false),
StructField("eventType", StringType, nullable = false),
StructField("latitude", DoubleType, nullable = false),
StructField("longitude", DoubleType, nullable = false),
StructField("correlationId", LongType, nullable = false),
StructField("geoAddress", StringType, nullable = true)
))
geoEventDf
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp")
.as[(String, String, Long)]
.select(from_json($"value", schema = geoSchema).as("data"), $"timestamp")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment