Skip to content

Instantly share code, notes, and snippets.

@codeaperature
Last active May 7, 2016 22:07
Show Gist options
  • Save codeaperature/d57681b2fc3c9ce78adaf63f07662fb6 to your computer and use it in GitHub Desktop.
Save codeaperature/d57681b2fc3c9ce78adaf63f07662fb6 to your computer and use it in GitHub Desktop.

Tricks I Needed to Manipulate DataFrames in Spark

Create an empty DataFrame

case class ns(name : String, age : Integer, grp : Integer)
sc.parallelize(List[ns]()).toDF.schema

Windowing over a DataFrame

It's not possible to aggregate with collect_set or collect_list with Spark 1.6.1 - It's going to be fixed in 2.0. See http://stackoverflow.com/questions/36963616/spark-scala-dataframes-windowing-to-have-a-wrappedarraystring-accumulative-set

Getting Schema from Case Class

scala> case class ns(name : String, age : Integer, grp : Integer)
defined class ns

scala> import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.ScalaReflection

scala> val schema = ScalaReflection.schemaFor[ns].dataType.asInstanceOf[StructType]
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(grp,IntegerType,true))

Forcing a Case Class Schema on JSON RDD item:

val df= sqlContext.
    read.schema(ScalaReflection.schemaFor[SomeCaseClass].dataType.asInstanceOf[StructType]).
    json(someRDDwithJSONasSecondInTuple.map(kvpair => kvpair._2))

Convert part of DataFrame to JSON

See - http://stackoverflow.com/questions/36157810/spark-row-to-json

Case Classes with more than 22 items (Scala 2.11 allows more than 22 items)

It's painful but nesessary http://stackoverflow.com/questions/20258417/how-to-get-around-the-scala-case-class-limit-of-22-fields

About Partitioning

val shufflePartitions = 8
val partitioner = new HashPartitioner(shufflePartitions)
val dfTmp =
    dataframe1.
    unionAll(dataframe2)

// See the following for the reasons behind the following code
// http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe
// Hash the code by partitions
val partitionedDF = sqlContext.createDataFrame(
    dfTmp.rdd.map(row => (row.getString(0), row)).partitionBy(partitioner).values,
    dfTmp.schema
)

DataFrames Shuffling Control of Partitions

When operating on DataFrames for operations like .distinct, you can endup splitting into a de facto 200 partitions, here is a way set the shuffles.

sqlContext.sql("SET spark.sql.shuffle.partitions= 16")

Getting DB / KV items related to a DataFrame

import scalikejdbc._
object SetupJDBC {
  def apply(driver: String, host: String, user: String, password: String): Unit = {
    Class.forName("org.postgresql.Driver")
    ConnectionPool.singleton(host, user, password)
  }

}
val partitionedDF = idDF.rdd.
  mapPartitions{iter =>
    SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
    iter.map{ elem =>
      (elem(0).toString,
        DB.readOnly { implicit session =>
          sql"SELECT jsonStrColumn FROM myTable WHERE id = ${elem(0)}::varchar"
            .map { resultSet => resultSet.string(1) }.single.apply()
        }
        )
    }
  }.
  filter(x => x._2.isDefined).
  map(kvPair => (kvPair._1, kvPair._2.get))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment