Skip to content

Instantly share code, notes, and snippets.

@alev000
Last active August 15, 2019 22:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alev000/5be58bffb2dbc64bcdcc45fefb025a6e to your computer and use it in GitHub Desktop.
Save alev000/5be58bffb2dbc64bcdcc45fefb025a6e to your computer and use it in GitHub Desktop.
package com.example
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object DynamicSchemaExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[*]").getOrCreate()
val rowSize = 10
val schema = makeSchema(size = rowSize)
val enc = RowEncoder(schema)
import spark.implicits.{localSeqToDatasetHolder, newIntEncoder}
val inputs = localSeqToDatasetHolder(Seq(0, 1, 2))(newIntEncoder).toDS()
println(makeRow(rowIndex = 0, size = rowSize))
println(schema)
val data = inputs
.groupByKey(identity[Int])(newIntEncoder).mapGroups { (x: Int, _: Iterator[Int]) =>
makeRow(rowIndex = x, size = rowSize)
}(enc)
data.show()
}
private def makePair(x: Double): (Double, Double) = {
(x, x + 0.5)
}
private def makeNestedPair(x: Double): ((Double, Double), (Double, Double)) = {
((x, x + 0.25), (x + 0.5, x + 0.75))
}
private val pairType = StructType(Seq(StructField("_1", DoubleType), StructField("_2", DoubleType)))
private val nestedPairType = StructType(Seq(StructField("_1", pairType), StructField("_2", pairType)))
private def makeSchema(size: Int): StructType = {
StructType(1.to(size).map { i =>
val fieldType = i % 3 match {
case 0 => DoubleType
case 1 => pairType
case 2 => nestedPairType
}
StructField(s"x${i}", fieldType)
})
}
private def makeRow(rowIndex: Int, size: Int): Row = {
val values: Seq[Any] = 1.to(size).map { i =>
i % 3 match {
case 0 => rowIndex.toDouble
case 1 => makePair(rowIndex.toDouble)
case 2 => makeNestedPair(rowIndex.toDouble)
}
}
Row(values : _*)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment