Skip to content

Instantly share code, notes, and snippets.

@oripwk
Created July 31, 2018 11:57
Show Gist options
  • Save oripwk/6ddb271386856e640e0c5087135afd20 to your computer and use it in GitHub Desktop.
Save oripwk/6ddb271386856e640e0c5087135afd20 to your computer and use it in GitHub Desktop.
Structured Streaming filter only latest record per key
spark
.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load()
.as[String]
.map { value =>
val split = value.split(",")
(split(0), split(1))
}
.groupByKey(_._1)
.mapGroupsWithState { (_, tuples: Iterator[(String, String)], state: GroupState[(String, String)]) =>
val list = tuples.toList
val max = state.getOption.fold(list)(_ :: list).maxBy(_._2.toInt)
state.update(max)
state.get
}
.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime(3000))
.format("console")
.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment