Skip to content

Instantly share code, notes, and snippets.

@ajnavarro
Last active August 29, 2015 14:15
Show Gist options
  • Save ajnavarro/a5eb50fc5e9741ee480f to your computer and use it in GitHub Desktop.
Save ajnavarro/a5eb50fc5e9741ee480f to your computer and use it in GitHub Desktop.
Example to how to apply rules to a logicalPlan
==== ORIGINAL QUERY ====
'Project ['a.name.]
'Filter ('identification = 2)
'Subquery a
'Project ['name,'id AS identification#5]
'Filter (1 = 1)
'UnresolvedRelation [events], None
==== RESOLVE RELATIONS ====
'Project ['a.name.]
'Filter ('identification = 2)
'Subquery a
'Project ['name,'id AS identification#5]
Filter (1 = 1)
Subquery events
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35
==== RESOLVE REFERENCES ====
Project [name#0]
Filter (identification#5 = 2)
Subquery a
Project [name#0,id#1 AS identification#5]
Filter (1 = 1)
Subquery events
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35
==== WITH CASTS ====
Project [name#0]
Filter (identification#5 = 2)
Subquery a
Project [name#0,id#1 AS identification#5]
Filter (1 = 1)
Subquery events
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35
==== ANALYZED PLAN ====
Project [name#0]
Filter (identification#5 = 2)
Project [name#0,id#1 AS identification#5]
Filter (1 = 1)
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35
==== FOLDED PLAN ====
Project [name#0]
Filter (identification#5 = 2)
Project [name#0,id#1 AS identification#5]
Filter false
LogicalRDD [name#0,id#1], MapPartitionsRDD[1] at mapPartitions at ExistingRDD.scala:35
==== SIMPLIFY FILTERS ====
Project [name#0]
Filter (identification#5 = 2)
Project [name#0,id#1 AS identification#5]
LocalRelation [name#0,id#1], []
==== PUSHED FILTERS ====
Project [name#0]
Project [name#0,id#1 AS identification#5]
Filter (id#1 = 2)
LocalRelation [name#0,id#1], []
==== COLUMN PRUNING ====
Project [name#0]
Filter (id#1 = 2)
LocalRelation [name#0,id#1], []
==== PHYSICAL PLAN ====
Project [name#0]
Filter (id#1 = 2)
LocalTableScan [name#0,id#1], []
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, ConstantFolding, PushPredicateThroughProject, SimplifyFilters}
import org.apache.spark.{SparkConf, SparkContext}
case class E(name: String, id: Int)
object RuleMain extends App {
val conf = new SparkConf().setMaster("local[2]").setAppName("test-app")
val sc = new SparkContext(conf)
val sqlc = new SQLContext(sc)
import org.apache.spark.sql.RuleMain.sqlc.implicits._
sc.parallelize((1 to 100).map(i => new E(i.toString, i))).registerTempTable("events")
val originalQuery = sqlc.sql(
"""
|SELECT a.name
|FROM (
| SELECT name, id AS identification
| FROM events
| WHERE 1='1') a
|WHERE identification = '2'
""".stripMargin)
println("==== ORIGINAL QUERY ====")
println(originalQuery.queryExecution.logical)
println("==== RESOLVE RELATIONS ====")
val resolvedRelations = sqlc.analyzer.ResolveRelations(originalQuery.logicalPlan)
println(resolvedRelations)
println("==== RESOLVE REFERENCES ====")
val resolvedReferences = sqlc.analyzer.ResolveReferences(resolvedRelations)
println(resolvedReferences)
println("==== WITH CASTS ====")
val withCasts = sqlc.analyzer.ResolveReferences(resolvedReferences)
println(withCasts)
println("==== ANALYZED PLAN ====")
val analyzedPlan = EliminateAnalysisOperators(withCasts)
println(analyzedPlan)
println("==== FOLDED PLAN ====")
val foldedPlan = ConstantFolding(analyzedPlan)
println(foldedPlan)
println("==== SIMPLIFY FILTERS ====")
val simplifiedFilters = SimplifyFilters(foldedPlan)
println(simplifiedFilters)
println("==== PUSHED FILTERS ====")
val pushedFilters = PushPredicateThroughProject(simplifiedFilters)
println(pushedFilters)
println("==== COLUMN PRUNING ====")
val simplifiedProjects = ColumnPruning(pushedFilters)
println(simplifiedProjects)
println("==== PHYSICAL PLAN ====")
val physicalPlan = sqlc.planner(simplifiedProjects).next()
println(physicalPlan)
physicalPlan.executeCollect().map(e => println(e(0)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment