Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active February 17, 2019 18:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dgadiraju/850065cb572fa4d15730464a465ac7bb to your computer and use it in GitHub Desktop.
Save dgadiraju/850065cb572fa4d15730464a465ac7bb to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
master("local").
appName("Spark Structured Streaming Demo").
getOrCreate
spark.sparkContext.setLogLevel("ERROR")
val orders = spark.
readStream.
schema("order_id INT, order_date STRING, order_customer_id INT, order_status STRING").
csv("/mnt/c/data/retail_db/orders")
val query = orders.
writeStream.
queryName("orders").
format("memory").
start
spark.sql("select * from orders").show
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.
builder.
master("local").
appName("Get Department Traffic").
getOrCreate
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val lines = spark.readStream.
format("socket").
option("host", "localhost").
option("port", "9999").
load
val departmentTraffic = lines.
where(split(split($"value", " ")(6), "/")(1) === "department").
select(split(split($"value", " ")(6), "/")(2).alias("department_name")).
groupBy($"department_name").
agg(count($"department_name").alias("department_count"))
val query = departmentTraffic.
writeStream.
queryName("department_count").
outputMode("complete").
format("memory").
start
spark.sql("select * from department_count").show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment