Skip to content

Instantly share code, notes, and snippets.

@dondrake
Last active June 28, 2016 00:23
Show Gist options
  • Save dondrake/be6b92aff71433e9fb627b478b78b839 to your computer and use it in GitHub Desktop.
Save dondrake/be6b92aff71433e9fb627b478b78b839 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar
import sqlContext.implicits._
case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
val teams = sc.parallelize(Seq(
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88),
C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),
C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72)
)).toDS
// https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] {
def zero: Seq[C1] = Seq.empty[C1] //Nil
def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
def finish(r: Seq[C1]): Seq[C1] = r
override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
}
// doesn't work:
// val g_c1 = teams.select(C1Agg.toColumn)
// works:
val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).show(false)
@dondrake
Copy link
Author

I just pasted this in a Spark 2.0.0.0-preview spark-shell and it ran without errors. I get a similar error if I run the commented out select (teams.select(C1Agg.toColumn))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment