Skip to content

Instantly share code, notes, and snippets.

@ldacosta
Last active August 29, 2015 14:08
Show Gist options
  • Save ldacosta/1cbd0f1387e7c92fea42 to your computer and use it in GitHub Desktop.
Save ldacosta/1cbd0f1387e7c92fea42 to your computer and use it in GitHub Desktop.
Spark SQL: register a table using a case class that has user-defined types
/**
* We want to register a table with Spark SQL whose rows are composed of 2 longs and 1 String.
* We would like to put restrictions of the shape of the String: for example, that it doesn't
* contain non-alphanumeric characters (or whatever...)
* Let's define what we want:
*/
// CleanString defined somewhere else
case class MyCaseClass(firstP: Long, secondP: Long, thirdP: CleanString)
// The obvious advantage of this signature is that it forces the strign to be as I want it to be,
// otherwise the compiler screams.
/**
* OK. Let's now suppose that I end up with an RDD that holds the data I want on the table
*/
val theSchemaRDD: RDD[MyCaseClass] = ... // compute the thing
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
theSchemaRDD.registerAsTable("accounts") // <- this fails with a scala.MatchError
/**
* So, *why* does it fail with a MatchError? I think it is because Catalyst tries to infer
* the schema of the RDD, and it sees a type he does not know
* (cf https://github.com/apache/spark/blob/05308426f0f51273be95fb1ca2cb1ec19d83cec8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala,
* line 53+, or lookup function schemaFor)
*
* How do we solve that???
* We could try to make the constructor of the case class private, and force access
* through the companion object:
*/
trait anAccount {
val accountKey: Long
val accountId: Long
val accountName: String
}
object anAccount {
def apply(accountKey: Long, accountId: Long, accountName: String): anAccount = {
anAccountImpl(accountKey, accountId, CleanString(accountName).toString)
}
private case class anAccountImpl(accountKey: Long, accountId: Long, accountName: String) extends anAccount
}
// Unfortunately theSchemaRDD::
val theSchemaRDD: RDD[MyCaseClass] = ... // compute the thing
// is NOT an RDD of case class anymore, and then
// can NOT be implicitly converted into a SchemaRDD, so I can not register it as a table
// (references or exact code for this assertion to be found yet)
// The advantage here is that, at least, the compiler complains...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment