Skip to content

Instantly share code, notes, and snippets.

@ioleo
Last active March 22, 2017 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ioleo/5a14dbca9b52b841cb97d17ba952943f to your computer and use it in GitHub Desktop.
Save ioleo/5a14dbca9b52b841cb97d17ba952943f to your computer and use it in GitHub Desktop.
Hacking Spark internals to add custom Rules for Logical Plan
package org.apache.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.{SQLConf, SessionState}
case class MySparkSession(override val sparkContext: SparkContext) extends SparkSession(sparkContext) {
override lazy val sessionState = MySessionState(this)
}
case class MySessionState(sparkSession: SparkSession) extends SessionState(sparkSession) {
override lazy val conf: SQLConf = new SQLConf
override lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf())
override lazy val analyzer = new MyAnalyzer(catalog, conf)
}
object MyRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
println("yoooo działa PRINTLN!!!")
// TODO: make some changes to LogicalPlan :)
plan
}
}
class MyAnalyzer(catalog: SessionCatalog, conf: CatalystConf) extends Analyzer(catalog, conf, conf.optimizerMaxIterations) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Seq(MyRule)
}
object SparkathonMarch2017 extends App {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkathon")
val context: SparkContext = new SparkContext(conf)
val session: SparkSession = MySparkSession(context)
println(session.sql("show tables").explain(extended=true))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment