Skip to content

Instantly share code, notes, and snippets.

@pathikrit
Created July 12, 2019 13:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pathikrit/1dad58a4a1069114e2a9a0af1f1cfee3 to your computer and use it in GitHub Desktop.
Save pathikrit/1dad58a4a1069114e2a9a0af1f1cfee3 to your computer and use it in GitHub Desktop.
Spark Schema DSL
import org.apache.spark.sql.types._
import org.apache.spark.sql._
object SchemaDsl {
case class ScalaToSparkType[ScalaType](sparkType: DataType, isNullable: Boolean = false) {
def toField(name: String) = StructField(name = name, dataType = sparkType, nullable = isNullable)
}
implicit val stringType: ScalaToSparkType[String] = ScalaToSparkType(StringType)
implicit val intType: ScalaToSparkType[Int] = ScalaToSparkType(IntegerType)
implicit val longType: ScalaToSparkType[Long] = ScalaToSparkType(LongType)
implicit def optionType[A](implicit ev: ScalaToSparkType[A]): ScalaToSparkType[Option[A]] =
ev.copy(isNullable = false)
implicit class ColumnDsl(s: String) {
def ofType[A](implicit ev: ScalaToSparkType[A]): StructField = ev.toField(s)
}
implicit class DataFrameReaderExtension(reader: DataFrameReader) {
def withSchema(fields: StructField*) = reader.schema(StructType(fields))
}
}
object SchemaDslTest {
import SchemaDsl._
def read(path: String)(implicit spark: SparkSession) = {
spark.read
.format("csv")
.withSchema(
"id".ofType[Long],
"name".ofType[Option[String]]
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment