Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Last active April 23, 2018 02:56
Show Gist options
  • Save HeartSaVioR/3bd3dba42f899d3174bb5fcd18914f79 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/3bd3dba42f899d3174bb5fcd18914f79 to your computer and use it in GitHub Desktop.
Structured Streaming: select a row from each group which has max value of specific field (requires aggregation)
// This is to answer the question regarding structured streaming on the Apache Spark mailing list
// http://apache-spark-user-list.1001560.n3.nabble.com/can-we-use-mapGroupsWithState-in-raw-sql-tp31885p31893.html
/*
Input:
id | amount | my_timestamp
-------------------------------------------
1 | 5 | 2018-04-01T01:00:00.000Z
1 | 10 | 2018-04-01T01:10:00.000Z
2 | 20 | 2018-04-01T01:20:00.000Z
2 | 30 | 2018-04-01T01:25:00.000Z
2 | 40 | 2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
-------------------------------------------
1 | 10 | 2018-04-01T01:10:00.000Z
2 | 40 | 2018-04-01T01:30:00.000Z
*/
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.sql.Date
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
import spark.implicits._
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val schema = StructType(Seq(
StructField("ID", IntegerType, true),
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true)
))
val query = socketDF
.selectExpr("CAST(value AS STRING) as value")
.as[String]
.select(from_json($"value", schema=schema).as("data"))
.select($"data.*")
.groupBy($"ID")
.agg(max(struct($"AMOUNT", $"*")).as("data"))
.select($"data.*")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("1 seconds"))
.outputMode(OutputMode.Update())
.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment