Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SparkSQL samples for ScalaMatsuri2014

SparkSQL samples for ScalaMatsuri2014

Initialize

Docker

https://github.com/ueshin/docker

$ docker run -it -p 14040:4040 spark:1.1.0-rc4
  • http://${dockerhost}:14040/

Spark

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.plans._

val sqlContext = new SQLContext(sc)

import sqlContext._

case class Person(name: String, age: Int)

val people = createSchemaRDD(sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)))
people.schemaString

people.registerTempTable("people")

RDD sample

val textFile = sc.textFile("README.md")
textFile.count()

// how many lines contain "Spark"?
textFile.filter(line => line.contains("Spark")).count()

// wordcount
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts.collect()

SQL sample

val teenagers_sql = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

teenagers_sql.map(t => "Name: " + t(0)).collect().foreach(println)

DSL sample

val teenagers_dsl = people.where('age >= 13 && 'age <= 19).select('name)

teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)

Compare query plan

teenagers_sql.queryExecution.executedPlan
teenagers_dsl.queryExecution.executedPlan
scala> teenagers.queryExecution.executedPlan
res11: org.apache.spark.sql.execution.SparkPlan =
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  ExistingRdd [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:208


scala> teenagers_dsl.queryExecution.executedPlan
res12: org.apache.spark.sql.execution.SparkPlan =
Project [name#10]
 Filter ((age#11 >= 13) && (age#11 <= 19))
  ExistingRdd [name#10,age#11], MapPartitionsRDD[24] at mapPartitions at basicOperators.scala:208

Optimizations

NullPropagation, ConstantFolding

println(sql("""

  select 1 + null, 1 + 2, count(null)

""").queryExecution)

BooleanSimplification

println(sql("""

  select false and (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select true and (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select true or (age >= 13 and age <= 19) from people

""").queryExecution)
println(sql("""

  select false or (age >= 13 and age <= 19) from people

""").queryExecution)

SimplifyFilters

println(sql("""

  select name from people where true

""").queryExecution)
println(sql("""

  select name from people where false

""").queryExecution)

CombineFilters

println(

  people.where('age >= 13).where('age <= 19)

.queryExecution)

PushPredicateThroughProject

println(

  people.select('age).where('age >= 13)

.queryExecution)

PushPredicateThroughJoin

println(

  people.as('l)
  .join(
    people.as('r),
    Inner,
    Option("l.name".attr === "r.name".attr)
  )
  .where("l.age".attr >= 13)

.queryExecution)

ColumnPruning

println(

  people.as('l)
  .join(
    people.as('r),
    LeftSemi,
    Option("l.name".attr === "r.name".attr)
  )

.queryExecution)

Execution

(
  people.as('l)
  .join(
    people.as('r),
    LeftSemi,
    Option("l.name".attr === "r.name".attr)
  )
).collect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment