Last active
June 28, 2016 00:23
-
-
Save dondrake/be6b92aff71433e9fb627b478b78b839 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | |
import org.apache.spark.sql.expressions.Aggregator | |
import org.apache.spark.sql.{Encoder,Row} | |
import org.apache.spark.sql.functions._ | |
import java.util.Calendar | |
import sqlContext.implicits._ | |
case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double) | |
val teams = sc.parallelize(Seq( | |
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21), | |
C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), | |
C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12), | |
C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72) | |
)).toDS | |
// https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html | |
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] { | |
def zero: Seq[C1] = Seq.empty[C1] //Nil | |
def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a | |
def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2 | |
def finish(r: Seq[C1]): Seq[C1] = r | |
override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder() | |
override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder() | |
} | |
// doesn't work: | |
// val g_c1 = teams.select(C1Agg.toColumn) | |
// works: | |
val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).show(false) |
I just pasted this in a Spark 2.0.0.0-preview spark-shell and it ran without errors. I get a similar error if I run the commented out select (teams.select(C1Agg.toColumn))
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi dondrake,
I tried to run your code (using groupBy, not select), but id doesn't work:
scala.reflect.internal.MissingRequirementError: object $line40.$read not found. at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.ensureModuleSymbol(Mirrors.scala:126) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:161) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:21) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$typecreator2$1.apply(<console>:40) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.SQLImplicits$$typecreator10$1.apply(SQLImplicits.scala:96) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50) at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:96) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
Any hints?
Thank you.