Skip to content

Instantly share code, notes, and snippets.

@dondrake
Last active August 1, 2016 12:11
Show Gist options
  • Save dondrake/91efc0cb2e10c7ddf0573c5ebbcc268a to your computer and use it in GitHub Desktop.
Save dondrake/91efc0cb2e10c7ddf0573c5ebbcc268a to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar
import sqlContext.implicits._
case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
case class C2(f2:String, seqC1:Seq[C1])
case class C3(f2:String, seqOfSeqC1:Seq[Seq[C1]])
val teams = sc.parallelize(Seq(
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
C1("hash2", "NLC", "Cardinals", java.sql.Date.valueOf("2014-01-23"), 353.88),
C1("hash2", "NLC", "Pirates", java.sql.Date.valueOf("2014-01-23"), 275.66),
C1("hash2", "NLC", "Reds", java.sql.Date.valueOf("2014-01-23"), 583.84),
C1("hash2", "NLC", "Brewers", java.sql.Date.valueOf("2014-01-23"), 349.12),
C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),
C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72)
)).toDS
val groupDS=teams.groupBy(_.f2).mapGroups( (k, vals) => {
val c1Seq = vals.toSeq
val first = c1Seq.last
C3(first.f1, List(c1Seq, c1Seq))
})
groupDS
groupDS.toDF.write.parquet("test")
val ds = sqlContext.read.parquet("test").as[C3]
val gdf = groupDS.toDF
gdf.show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment