Last active
April 23, 2018 02:56
-
-
Save HeartSaVioR/3bd3dba42f899d3174bb5fcd18914f79 to your computer and use it in GitHub Desktop.
Structured Streaming: select a row from each group which has max value of specific field (requires aggregation)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is to answer the question regarding structured streaming on the Apache Spark mailing list | |
// http://apache-spark-user-list.1001560.n3.nabble.com/can-we-use-mapGroupsWithState-in-raw-sql-tp31885p31893.html | |
/* | |
Input: | |
id | amount | my_timestamp | |
------------------------------------------- | |
1 | 5 | 2018-04-01T01:00:00.000Z | |
1 | 10 | 2018-04-01T01:10:00.000Z | |
2 | 20 | 2018-04-01T01:20:00.000Z | |
2 | 30 | 2018-04-01T01:25:00.000Z | |
2 | 40 | 2018-04-01T01:30:00.000Z | |
Expected Output: | |
id | amount | my_timestamp | |
------------------------------------------- | |
1 | 10 | 2018-04-01T01:10:00.000Z | |
2 | 40 | 2018-04-01T01:30:00.000Z | |
*/ | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types._ | |
import java.sql.Date | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.streaming.{OutputMode, Trigger} | |
import org.apache.spark.sql.types._ | |
import spark.implicits._ | |
val socketDF = spark | |
.readStream | |
.format("socket") | |
.option("host", "localhost") | |
.option("port", 9999) | |
.load() | |
val schema = StructType(Seq( | |
StructField("ID", IntegerType, true), | |
StructField("AMOUNT", IntegerType, true), | |
StructField("MY_TIMESTAMP", DateType, true) | |
)) | |
val query = socketDF | |
.selectExpr("CAST(value AS STRING) as value") | |
.as[String] | |
.select(from_json($"value", schema=schema).as("data")) | |
.select($"data.*") | |
.groupBy($"ID") | |
.agg(max(struct($"AMOUNT", $"*")).as("data")) | |
.select($"data.*") | |
.writeStream | |
.format("console") | |
.trigger(Trigger.ProcessingTime("1 seconds")) | |
.outputMode(OutputMode.Update()) | |
.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment