Created
June 27, 2018 09:51
-
-
Save HeartSaVioR/fa7febfae420dca36bdf78607af2e02a to your computer and use it in GitHub Desktop.
Structured Streaming version of HDF IoT Trucking: StreamingAnalyticsTruckingRefApp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties | |
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
import org.apache.spark.sql.functions.{from_json, struct, to_json} | |
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger} | |
import org.apache.spark.sql.types._ | |
import org.apache.commons.logging.LogFactory | |
object StreamingAnalyticsTruckingRefApp { | |
// this is simplified version of Streaming-Analytics-Trucking-Ref-App to make it compatible with | |
// Spark Structured Streaming | |
val logger = LogFactory.getLog(StreamingAnalyticsTruckingRefApp.getClass.getName) | |
def main(args: Array[String]): Unit = { | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions.expr | |
if (args.length < 2) { | |
println("params: [broker servers] [checkpoint location (HDFS)]") | |
sys.exit(1) | |
} | |
val brokerServers = args(0) | |
val checkpointLocation = args(1) | |
val ss = SparkSession | |
.builder() | |
.appName("StreamingAnalyticsTruckingRefApp") | |
//.master("local[*]") | |
.getOrCreate() | |
import ss.implicits._ | |
val geoEventDf: DataFrame = getTruckGeoEventDf(ss, brokerServers) | |
val speedEventDf: DataFrame = getTruckSpeedEventDf(ss, brokerServers) | |
val joinedDf = geoEventDf.join(speedEventDf, | |
expr(""" | |
data.driverId = data2.driverId | |
AND timestamp >= timestamp2 | |
AND timestamp <= timestamp2 + interval 1 minute | |
"""), "inner") | |
val abnormalEventsDf = joinedDf | |
.filter("data.eventType <> 'Normal'") | |
.select($"data.driverId", $"data.driverName", $"data2.route", $"data2.speed") | |
val query = abnormalEventsDf | |
.select(to_json(struct($"driverId", $"driverName", $"route", $"speed")).as("value")) | |
.writeStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", brokerServers) | |
.option("topic", "streaming_analytics_trucking_app_result") | |
.option("checkpointLocation", checkpointLocation) | |
.trigger(Trigger.ProcessingTime("10 seconds")) | |
.outputMode(OutputMode.Append()) | |
.start() | |
ss.streams.addListener(new QueryListenerSendingProgressToKafka(brokerServers)) | |
query.awaitTermination() | |
} | |
class QueryListenerSendingProgressToKafka(brokerServers: String) extends StreamingQueryListener { | |
val props = new Properties | |
props.put("bootstrap.servers", brokerServers) | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
val kafkaProducer = new KafkaProducer[String, String](props) | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { | |
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}") | |
} | |
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | |
try { | |
val data = new ProducerRecord[String, String]( | |
"streaming_analytics_trucking_app_query_progress", | |
event.progress.runId.toString, event.progress.json) | |
kafkaProducer.send(data) | |
} catch { | |
case e: Exception => | |
logger.error("Error sending event[" + event.progress.json + "] to Kafka topic", e) | |
} | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { | |
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}") | |
} | |
} | |
private def getTruckSpeedEventDf(ss: SparkSession, bootstrapServers: String) = { | |
import ss.implicits._ | |
val speedEventDf = ss | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrapServers) | |
.option("subscribe", "truck_speed_events_stream") | |
.option("startingOffsets", "latest") | |
.load() | |
val speedSchema = StructType(Seq( | |
StructField("eventTime", StringType, nullable = false), | |
StructField("eventSource", StringType, nullable = false), | |
StructField("truckId", IntegerType, nullable = false), | |
StructField("driverId", IntegerType, nullable = false), | |
StructField("driverName", StringType, nullable = false), | |
StructField("routeId", IntegerType, nullable = false), | |
StructField("route", StringType, nullable = false), | |
StructField("speed", IntegerType, nullable = false) | |
)) | |
speedEventDf | |
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp") | |
.as[(String, String, Long)] | |
.select(from_json($"value", schema = speedSchema).as("data2"), $"timestamp".as("timestamp2")) | |
} | |
private def getTruckGeoEventDf(ss: SparkSession, bootstrapServers: String) = { | |
import ss.implicits._ | |
val geoEventDf = ss | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrapServers) | |
.option("subscribe", "truck_events_stream") | |
.option("startingOffsets", "latest") | |
.load() | |
val geoSchema = StructType(Seq( | |
StructField("eventTime", StringType, nullable = false), | |
StructField("eventSource", StringType, nullable = false), | |
StructField("truckId", IntegerType, nullable = false), | |
StructField("driverId", IntegerType, nullable = false), | |
StructField("driverName", StringType, nullable = false), | |
StructField("routeId", IntegerType, nullable = false), | |
StructField("eventType", StringType, nullable = false), | |
StructField("latitude", DoubleType, nullable = false), | |
StructField("longitude", DoubleType, nullable = false), | |
StructField("correlationId", LongType, nullable = false), | |
StructField("geoAddress", StringType, nullable = true) | |
)) | |
geoEventDf | |
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "timestamp") | |
.as[(String, String, Long)] | |
.select(from_json($"value", schema = geoSchema).as("data"), $"timestamp") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment