Skip to content

Instantly share code, notes, and snippets.

@explicite
Created March 22, 2019 12:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save explicite/fba856d882f8f71c2d9e626d48a4b55b to your computer and use it in GitHub Desktop.
Save explicite/fba856d882f8f71c2d9e626d48a4b55b to your computer and use it in GitHub Desktop.
reduce by key on DS in Spark
def reduceBy[A, B](merge: (A, A) => A)(
by: A => B
)(ds: Dataset[A])(implicit session: SparkSession, encoderA: Encoder[A], encoderB: Encoder[B]): Dataset[A] = {
def reducePartition(iter: Iterator[A]): Iterator[A] = {
iter.toList
.groupBy(by)
.mapValues(values => values.reduce(merge))
.values
.toIterator
}
ds.mapPartitions(reducePartition)
.groupByKey(by)
.reduceGroups(merge)
.map { case (_, updates) => updates }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment