Skip to content

Instantly share code, notes, and snippets.

@dondrake
Last active June 28, 2016 00:23
Show Gist options
  • Save dondrake/be6b92aff71433e9fb627b478b78b839 to your computer and use it in GitHub Desktop.
Save dondrake/be6b92aff71433e9fb627b478b78b839 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar
import sqlContext.implicits._
case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
val teams = sc.parallelize(Seq(
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88),
C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),
C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72)
)).toDS
// https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] {
def zero: Seq[C1] = Seq.empty[C1] //Nil
def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
def finish(r: Seq[C1]): Seq[C1] = r
override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
}
// doesn't work:
// val g_c1 = teams.select(C1Agg.toColumn)
// works:
val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).show(false)
@ymazari
Copy link

ymazari commented Jun 19, 2016

Hi dondrake,
I tried to run your code (using groupBy, not select), but id doesn't work:
scala.reflect.internal.MissingRequirementError: object $line40.$read not found. at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.ensureModuleSymbol(Mirrors.scala:126) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:161) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:21) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$typecreator2$1.apply(<console>:40) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.SQLImplicits$$typecreator10$1.apply(SQLImplicits.scala:96) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50) at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:96) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:55)

Any hints?
Thank you.

@dondrake
Copy link
Author

I just pasted this in a Spark 2.0.0.0-preview spark-shell and it ran without errors. I get a similar error if I run the commented out select (teams.select(C1Agg.toColumn))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment