Last active
August 29, 2015 14:15
-
-
Save ajnavarro/a5eb50fc5e9741ee480f to your computer and use it in GitHub Desktop.
Example to how to apply rules to a logicalPlan
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
==== ORIGINAL QUERY ==== | |
'Project ['a.name.] | |
'Filter ('identification = 2) | |
'Subquery a | |
'Project ['name,'id AS identification#5] | |
'Filter (1 = 1) | |
'UnresolvedRelation [events], None | |
==== RESOLVE RELATIONS ==== | |
'Project ['a.name.] | |
'Filter ('identification = 2) | |
'Subquery a | |
'Project ['name,'id AS identification#5] | |
Filter (1 = 1) | |
Subquery events | |
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35 | |
==== RESOLVE REFERENCES ==== | |
Project [name#0] | |
Filter (identification#5 = 2) | |
Subquery a | |
Project [name#0,id#1 AS identification#5] | |
Filter (1 = 1) | |
Subquery events | |
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35 | |
==== WITH CASTS ==== | |
Project [name#0] | |
Filter (identification#5 = 2) | |
Subquery a | |
Project [name#0,id#1 AS identification#5] | |
Filter (1 = 1) | |
Subquery events | |
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35 | |
==== ANALYZED PLAN ==== | |
Project [name#0] | |
Filter (identification#5 = 2) | |
Project [name#0,id#1 AS identification#5] | |
Filter (1 = 1) | |
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35 | |
==== FOLDED PLAN ==== | |
Project [name#0] | |
Filter (identification#5 = 2) | |
Project [name#0,id#1 AS identification#5] | |
Filter false | |
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35 | |
==== SIMPLIFY FILTERS ==== | |
Project [name#0] | |
Filter (identification#5 = 2) | |
Project [name#0,id#1 AS identification#5] | |
LocalRelation [name#0,id#1], [] | |
==== PUSHED FILTERS ==== | |
Project [name#0] | |
Project [name#0,id#1 AS identification#5] | |
Filter (id#1 = 2) | |
LocalRelation [name#0,id#1], [] | |
==== COLUMN PRUNING ==== | |
Project [name#0] | |
Filter (id#1 = 2) | |
LocalRelation [name#0,id#1], [] | |
==== PHYSICAL PLAN ==== | |
Project [name#0] | |
Filter (id#1 = 2) | |
LocalTableScan [name#0,id#1], [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.sql | |
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators | |
import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, ConstantFolding, PushPredicateThroughProject, SimplifyFilters} | |
import org.apache.spark.{SparkConf, SparkContext} | |
case class E(name: String, id: Int) | |
object RuleMain extends App { | |
val conf = new SparkConf().setMaster("local[2]").setAppName("test-app") | |
val sc = new SparkContext(conf) | |
val sqlc = new SQLContext(sc) | |
import org.apache.spark.sql.RuleMain.sqlc.implicits._ | |
sc.parallelize((1 to 100).map(i => new E(i.toString, i))).registerTempTable("events") | |
val originalQuery = sqlc.sql( | |
""" | |
|SELECT a.name | |
|FROM ( | |
| SELECT name, id AS identification | |
| FROM events | |
| WHERE 1='1') a | |
|WHERE identification = '2' | |
""".stripMargin) | |
println("==== ORIGINAL QUERY ====") | |
println(originalQuery.queryExecution.logical) | |
println("==== RESOLVE RELATIONS ====") | |
val resolvedRelations = sqlc.analyzer.ResolveRelations(originalQuery.logicalPlan) | |
println(resolvedRelations) | |
println("==== RESOLVE REFERENCES ====") | |
val resolvedReferences = sqlc.analyzer.ResolveReferences(resolvedRelations) | |
println(resolvedReferences) | |
println("==== WITH CASTS ====") | |
val withCasts = sqlc.analyzer.ResolveReferences(resolvedReferences) | |
println(withCasts) | |
println("==== ANALYZED PLAN ====") | |
val analyzedPlan = EliminateAnalysisOperators(withCasts) | |
println(analyzedPlan) | |
println("==== FOLDED PLAN ====") | |
val foldedPlan = ConstantFolding(analyzedPlan) | |
println(foldedPlan) | |
println("==== SIMPLIFY FILTERS ====") | |
val simplifiedFilters = SimplifyFilters(foldedPlan) | |
println(simplifiedFilters) | |
println("==== PUSHED FILTERS ====") | |
val pushedFilters = PushPredicateThroughProject(simplifiedFilters) | |
println(pushedFilters) | |
println("==== COLUMN PRUNING ====") | |
val simplifiedProjects = ColumnPruning(pushedFilters) | |
println(simplifiedProjects) | |
println("==== PHYSICAL PLAN ====") | |
val physicalPlan = sqlc.planner(simplifiedProjects).next() | |
println(physicalPlan) | |
physicalPlan.executeCollect().map(e => println(e(0))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment