case class ns(name : String, age : Integer, grp : Integer)
sc.parallelize(List[ns]()).toDF.schema
It's not possible to aggregate with collect_set or collect_list with Spark 1.6.1 - It's going to be fixed in 2.0. See http://stackoverflow.com/questions/36963616/spark-scala-dataframes-windowing-to-have-a-wrappedarraystring-accumulative-set
scala> case class ns(name : String, age : Integer, grp : Integer)
defined class ns
scala> import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.ScalaReflection
scala> val schema = ScalaReflection.schemaFor[ns].dataType.asInstanceOf[StructType]
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(grp,IntegerType,true))
val df= sqlContext.
read.schema(ScalaReflection.schemaFor[SomeCaseClass].dataType.asInstanceOf[StructType]).
json(someRDDwithJSONasSecondInTuple.map(kvpair => kvpair._2))
See - http://stackoverflow.com/questions/36157810/spark-row-to-json
It's painful but nesessary http://stackoverflow.com/questions/20258417/how-to-get-around-the-scala-case-class-limit-of-22-fields
val shufflePartitions = 8
val partitioner = new HashPartitioner(shufflePartitions)
val dfTmp =
dataframe1.
unionAll(dataframe2)
// See the following for the reasons behind the following code
// http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe
// Hash the code by partitions
val partitionedDF = sqlContext.createDataFrame(
dfTmp.rdd.map(row => (row.getString(0), row)).partitionBy(partitioner).values,
dfTmp.schema
)
When operating on DataFrames for operations like .distinct
, you can endup splitting into a de facto 200 partitions, here is a way set the shuffles.
sqlContext.sql("SET spark.sql.shuffle.partitions= 16")
import scalikejdbc._
object SetupJDBC {
def apply(driver: String, host: String, user: String, password: String): Unit = {
Class.forName("org.postgresql.Driver")
ConnectionPool.singleton(host, user, password)
}
}
val partitionedDF = idDF.rdd.
mapPartitions{iter =>
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.map{ elem =>
(elem(0).toString,
DB.readOnly { implicit session =>
sql"SELECT jsonStrColumn FROM myTable WHERE id = ${elem(0)}::varchar"
.map { resultSet => resultSet.string(1) }.single.apply()
}
)
}
}.
filter(x => x._2.isDefined).
map(kvPair => (kvPair._1, kvPair._2.get))