Skip to content

Instantly share code, notes, and snippets.

@whamtet
Last active August 18, 2016 09:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save whamtet/b0b817250b68557334d669f8fe3cd802 to your computer and use it in GitHub Desktop.
Save whamtet/b0b817250b68557334d669f8fe3cd802 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{ExplainCommand, QueryExecution}
import org.apache.spark.sql.catalyst.InternalRow
//Sorry about the clojure, we need it to sidestep the Scala typesystem
import clojure.java.api.Clojure
import clojure.lang.IFn
class BetterDataFrame(override val sqlContext: SQLContext, override val logicalPlan: LogicalPlan) extends DataFrame(sqlContext, logicalPlan) {
def invoke_clojure(f: AnyRef) = {
f.asInstanceOf[IFn].invoke(this).asInstanceOf[DataFrame]
}
def invoke(f: DataFrame => DataFrame) = {
f(this)
}
}
implicit def improveFrame(df: DataFrame) = {
val load_string = Clojure.`var`("clojure.core", "load-string")
val logical_plan = load_string.invoke("""(fn [x] (.logicalPlan x))""").asInstanceOf[IFn]
new BetterDataFrame(df.sqlContext, logical_plan.invoke(df).asInstanceOf[LogicalPlan])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment