Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark SQL samples for Spark Meetup

Spark SQL samples for Spark Meetup

Initialize

Docker

https://github.com/ueshin/docker

$ docker run -it -p 14040:4040 spark:1.1.0-rc4
  • http://${dockerhost}:14040/

Spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.plans._

val sqlContext = new SQLContext(sc)

import sqlContext._

case class Person(name: String, age: Int)

val people = createSchemaRDD(sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)))
people.schemaString

people.registerTempTable("people")

SQL sample

val teenagers_sql = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

teenagers_sql.map(t => "Name: " + t(0)).collect().foreach(println)

DSL sample

val teenagers_dsl = people.where('age >= 13 && 'age <= 19).select('name)

teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)

Compare query plan

teenagers_sql.queryExecution.executedPlan
teenagers_dsl.queryExecution.executedPlan
scala> teenagers.queryExecution.executedPlan
res11: org.apache.spark.sql.execution.SparkPlan =
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  ExistingRdd [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208


scala> teenagers_dsl.queryExecution.executedPlan
res12: org.apache.spark.sql.execution.SparkPlan =
Project [name#10]
 Filter ((age#11 >= 13) && (age#11 <= 19))
  ExistingRdd [name#10,age#11], MapPartitionsRDD[24] at mapPartitions at basicOperators.scala:208

Optimizations

NullPropagation, ConstantFolding

println(sql("""

  select 1 + null, 1 + 2, count(null)

""").queryExecution)

BooleanSimplification

println(sql("""

  select false and (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select true and (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select true or (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select false or (age >= 13 and age <= 19) from people

""").queryExecution)

SimplifyFilters

println(sql("""

  select name from people where true

""").queryExecution)
println(sql("""

  select name from people where false

""").queryExecution)

CombineFilters

println(

  people.where('age >= 13).where('age <= 19)

.queryExecution)

PushPredicateThroughProject

println(

  people.select('age).where('age >= 13)

.queryExecution)

PushPredicateThroughJoin

println(

  people.as('l)
  .join(
    people.as('r),
    Inner,
    Option("l.name".attr === "r.name".attr)
  )
  .where("l.age".attr >= 13)

.queryExecution)

ColumnPruning

println(

  people.as('l)
  .join(
    people.as('r),
    LeftSemi,
    Option("l.name".attr === "r.name".attr)
  )

.queryExecution)

Execution

(
  people.as('l)
  .join(
    people.as('r),
    LeftSemi,
    Option("l.name".attr === "r.name".attr)
  )
).collect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment