Created
April 6, 2018 13:19
-
-
Save HeartSaVioR/ffe3d20362b0b282350e736827242342 to your computer and use it in GitHub Desktop.
Another Practice on Structured Streaming ingesting from Kafka and pushing to Kafka using continuous mode
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.heartsavior.spark.trial | |
import org.apache.spark.sql.catalyst.expressions.StructsToJson | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.streaming.{OutputMode, Trigger} | |
import org.apache.spark.sql.types._ | |
object SparkTrial2 { | |
def main(args: Array[String]): Unit = { | |
import org.apache.spark.sql.SparkSession | |
val ss = SparkSession | |
.builder() | |
.appName("Sample") | |
.master("local[*]") | |
.getOrCreate() | |
import ss.implicits._ | |
val kafkaDf = ss | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "apachelogs-v2") | |
.option("startingOffsets", "latest") | |
.load() | |
kafkaDf.isStreaming | |
kafkaDf.printSchema() | |
// each type of field in schema should strictly conform to the type of JSON value | |
// for example, you can't put IntegerType for "123" | |
// any mismatch will make all values in row to "null" | |
val schema = StructType(Seq( | |
StructField("ID", IntegerType, true), | |
StructField("REMOTE_IP", StringType, true), | |
StructField("REQUEST_URL", StringType, true), | |
StructField("REQUEST_METHOD", StringType, true), | |
StructField("STATUS", StringType, true), | |
StructField("REQUEST_HEADER_USER_AGENT", StringType, true), | |
StructField("TIME_RECEIVED_UTC_ISOFORMAT", DateType, true), | |
StructField("TIME_US", StringType, true) | |
)) | |
// Converts such queries for Storm SQL to Spark Structured Streaming | |
// CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs-v2' | |
// CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' | |
// INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 | |
val query = kafkaDf | |
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value") | |
.as[(String, String)] | |
.select(from_json($"value", schema=schema).as("data")) | |
.selectExpr( | |
"data.ID", "data.REMOTE_IP", "data.REQUEST_URL", "data.REQUEST_METHOD", | |
"CAST(data.STATUS AS INT) as STATUS_INT", "data.REQUEST_HEADER_USER_AGENT", | |
"data.TIME_RECEIVED_UTC_ISOFORMAT", "(CAST(data.TIME_US as DOUBLE) / 1000) as TIME_ELAPSED_MS") | |
.where("(STATUS_INT / 100) >= 4") | |
.select(to_json(struct($"ID", $"REMOTE_IP", $"REQUEST_URL", $"REQUEST_METHOD", $"STATUS_INT", | |
$"REQUEST_HEADER_USER_AGENT", $"TIME_RECEIVED_UTC_ISOFORMAT", $"TIME_ELAPSED_MS")).as("value")) | |
val executingQuery = query | |
.writeStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("topic", "apache-error-logs") | |
.option("checkpointLocation", "/tmp/spark-trial2-checkpoint") | |
.trigger(Trigger.Continuous("1 minute")) | |
.outputMode(OutputMode.Append()) | |
.start() | |
executingQuery.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment