Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created April 4, 2018 08:43
Show Gist options
  • Save HeartSaVioR/e9f5ce77cbad49f0c4ca3feac999ea51 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/e9f5ce77cbad49f0c4ca3feac999ea51 to your computer and use it in GitHub Desktop.
Practice on Structured Streaming ingesting from Kafka and pushing to console
package net.heartsavior.spark.trial
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
object SparkTrial {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val ss = SparkSession
.builder()
.appName("Sample")
.master("local[*]")
.getOrCreate()
import ss.implicits._
val kafkaDf = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "apachelogs-v2")
.option("startingOffsets", "earliest")
.load()
kafkaDf.isStreaming
kafkaDf.printSchema()
val schema = StructType(Seq(
StructField("ID", StringType, true),
StructField("REQUEST_URL", StringType, true),
StructField("STATUS", StringType, true),
StructField("REQUEST_METHOD", StringType, true),
StructField("TIME_RECEIVED_ISOFORMAT", DateType, true)
))
val query = kafkaDf
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.as[(String, String)]
.select(from_json($"value", schema=schema).as("data"))
.select($"data.ID", $"data.REQUEST_URL", $"data.STATUS", $"data.REQUEST_METHOD",
$"data.TIME_RECEIVED_ISOFORMAT")
.groupBy($"REQUEST_URL", $"STATUS")
.count()
.orderBy($"count".desc)
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("1 seconds"))
.outputMode(OutputMode.Complete())
.start()
query.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment