Last active May 7, 2016 22:07
Tricks I Needed to Manipulate DataFrames in Spark

Create an empty DataFrame

case class ns(name : String, age : Integer, grp : Integer)

Windowing over a DataFrame

It's not possible to aggregate with collect_set or collect_list with Spark 1.6.1 - It's going to be fixed in 2.0. See

Getting Schema from Case Class

scala> case class ns(name : String, age : Integer, grp : Integer)
defined class ns

scala> import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.ScalaReflection

scala> val schema = ScalaReflection.schemaFor[ns].dataType.asInstanceOf[StructType]
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(grp,IntegerType,true))

Forcing a Case Class Schema on JSON RDD item:

val df= sqlContext.
    json( => kvpair._2))

Convert part of DataFrame to JSON

See -

Case Classes with more than 22 items (Scala 2.11 allows more than 22 items)

It's painful but nesessary

About Partitioning

val shufflePartitions = 8
val partitioner = new HashPartitioner(shufflePartitions)
val dfTmp =

// See the following for the reasons behind the following code
// Hash the code by partitions
val partitionedDF = sqlContext.createDataFrame( => (row.getString(0), row)).partitionBy(partitioner).values,

DataFrames Shuffling Control of Partitions

When operating on DataFrames for operations like .distinct, you can endup splitting into a de facto 200 partitions, here is a way set the shuffles.

sqlContext.sql("SET spark.sql.shuffle.partitions= 16")

Getting DB / KV items related to a DataFrame

import scalikejdbc._
object SetupJDBC {
  def apply(driver: String, host: String, user: String, password: String): Unit = {
    ConnectionPool.singleton(host, user, password)

val partitionedDF = idDF.rdd.
  mapPartitions{iter =>
    SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword){ elem =>
        DB.readOnly { implicit session =>
          sql"SELECT jsonStrColumn FROM myTable WHERE id = ${elem(0)}::varchar"
            .map { resultSet => resultSet.string(1) }.single.apply()
  filter(x => x._2.isDefined).
  map(kvPair => (kvPair._1, kvPair._2.get))
