Last active
March 22, 2017 20:31
-
-
Save ioleo/5a14dbca9b52b841cb97d17ba952943f to your computer and use it in GitHub Desktop.
Hacking Spark internals to add custom Rules for Logical Plan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.sql | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.sql.catalyst.CatalystConf | |
import org.apache.spark.sql.catalyst.analysis.Analyzer | |
import org.apache.spark.sql.catalyst.catalog.SessionCatalog | |
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |
import org.apache.spark.sql.catalyst.rules.Rule | |
import org.apache.spark.sql.internal.{SQLConf, SessionState} | |
case class MySparkSession(override val sparkContext: SparkContext) extends SparkSession(sparkContext) { | |
override lazy val sessionState = MySessionState(this) | |
} | |
case class MySessionState(sparkSession: SparkSession) extends SessionState(sparkSession) { | |
override lazy val conf: SQLConf = new SQLConf | |
override lazy val catalog = new SessionCatalog( | |
sparkSession.sharedState.externalCatalog, | |
sparkSession.sharedState.globalTempViewManager, | |
functionResourceLoader, | |
functionRegistry, | |
conf, | |
newHadoopConf()) | |
override lazy val analyzer = new MyAnalyzer(catalog, conf) | |
} | |
object MyRule extends Rule[LogicalPlan] { | |
def apply(plan: LogicalPlan): LogicalPlan = { | |
println("yoooo działa PRINTLN!!!") | |
// TODO: make some changes to LogicalPlan :) | |
plan | |
} | |
} | |
class MyAnalyzer(catalog: SessionCatalog, conf: CatalystConf) extends Analyzer(catalog, conf, conf.optimizerMaxIterations) { | |
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Seq(MyRule) | |
} | |
object SparkathonMarch2017 extends App { | |
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkathon") | |
val context: SparkContext = new SparkContext(conf) | |
val session: SparkSession = MySparkSession(context) | |
println(session.sql("show tables").explain(extended=true)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment