Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created April 6, 2018 13:19
Show Gist options
  • Save HeartSaVioR/ffe3d20362b0b282350e736827242342 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/ffe3d20362b0b282350e736827242342 to your computer and use it in GitHub Desktop.
Another Practice on Structured Streaming ingesting from Kafka and pushing to Kafka using continuous mode
package net.heartsavior.spark.trial
import org.apache.spark.sql.catalyst.expressions.StructsToJson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
object SparkTrial2 {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val ss = SparkSession
.builder()
.appName("Sample")
.master("local[*]")
.getOrCreate()
import ss.implicits._
val kafkaDf = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "apachelogs-v2")
.option("startingOffsets", "latest")
.load()
kafkaDf.isStreaming
kafkaDf.printSchema()
// each type of field in schema should strictly conform to the type of JSON value
// for example, you can't put IntegerType for "123"
// any mismatch will make all values in row to "null"
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("REMOTE_IP", StringType, true),
StructField("REQUEST_URL", StringType, true),
StructField("REQUEST_METHOD", StringType, true),
StructField("STATUS", StringType, true),
StructField("REQUEST_HEADER_USER_AGENT", StringType, true),
StructField("TIME_RECEIVED_UTC_ISOFORMAT", DateType, true),
StructField("TIME_US", StringType, true)
))
// Converts such queries for Storm SQL to Spark Structured Streaming
// CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs-v2'
// CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
// INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4
val query = kafkaDf
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.as[(String, String)]
.select(from_json($"value", schema=schema).as("data"))
.selectExpr(
"data.ID", "data.REMOTE_IP", "data.REQUEST_URL", "data.REQUEST_METHOD",
"CAST(data.STATUS AS INT) as STATUS_INT", "data.REQUEST_HEADER_USER_AGENT",
"data.TIME_RECEIVED_UTC_ISOFORMAT", "(CAST(data.TIME_US as DOUBLE) / 1000) as TIME_ELAPSED_MS")
.where("(STATUS_INT / 100) >= 4")
.select(to_json(struct($"ID", $"REMOTE_IP", $"REQUEST_URL", $"REQUEST_METHOD", $"STATUS_INT",
$"REQUEST_HEADER_USER_AGENT", $"TIME_RECEIVED_UTC_ISOFORMAT", $"TIME_ELAPSED_MS")).as("value"))
val executingQuery = query
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "apache-error-logs")
.option("checkpointLocation", "/tmp/spark-trial2-checkpoint")
.trigger(Trigger.Continuous("1 minute"))
.outputMode(OutputMode.Append())
.start()
executingQuery.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment