Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anjijava16/108779f3b8739358258c449fa7bd16e0 to your computer and use it in GitHub Desktop.
Save anjijava16/108779f3b8739358258c449fa7bd16e0 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
master("local").
appName("Get Department Traffic").
getOrCreate
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
spark.conf.set("spark.sql.shuffle.partitions", "2")
val lines = spark.readStream.
format("socket").
option("host", "localhost").
option("port", "9999").
load
import org.apache.spark.sql.functions._
val departmentLines = lines.
where(split(split($"value", " ")(6), "/")(1) === "department").
withColumn("department_name", split(split($"value", " ")(6), "/")(2)).
withColumn("visit_time", to_timestamp(ltrim(split($"value", " ")(3), "["), "dd/MMM/yyyy:HH:mm:ss")).
drop($"value")
val departmentTraffic = departmentLines.
groupBy(window($"visit_time", "60 seconds", "20 seconds"), $"department_name").
agg(count("department_name").alias("department_count"))
import org.apache.spark.sql.streaming.Trigger
spark.conf.set("spark.sql.shuffle.partitions", "2")
val query = departmentTraffic.
writeStream.
queryName("department_traffic").
format("memory").
outputMode("update").
trigger(Trigger.ProcessingTime("20 seconds")).
start
spark.sql("SELECT * FROM department_traffic").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment