Skip to content

Instantly share code, notes, and snippets.

@DmytroMitin
Last active October 3, 2020 21:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00 to your computer and use it in GitHub Desktop.
Save DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox
object App {
lazy val spark = SparkSession.builder
.appName(getClass.getName)
.master("local")
.getOrCreate()
object schema {
case class Users(name: String,
favorite_color: String,
favorite_numbers: Array[Int])
object Users {
// implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
}
}
case class FileSystem[T: Encoder](inputPath: String, spark: SparkSession, saveMode: String, tableName: String) {
def provideData(metadata: Boolean, outputPath: String): Dataset[T] = spark.emptyDataset[T]
}
trait Loader[T] {
def Load: Dataset[T]
}
class Load[T <: Product: Encoder](val tableName: String,
val inputPath: String,
val spark: SparkSession,
val saveMode: String,
val outputPath: String,
val metadata: Boolean)
extends Loader[T] {
val fileSystemSourceInstance: FileSystem[T] =
new FileSystem[T](inputPath, spark, saveMode, tableName)
override def Load: Dataset[T] =
fileSystemSourceInstance.provideData(metadata, outputPath).as[T]
}
def main(args: Array[String]): Unit = {
val tableName = "tableName"
val inputPath = "inputPath"
val saveMode = "saveMode"
val outputPath = "outputPath"
val metadata = true
import spark.implicits._
val dataset = new Load[schema.Users](tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata).Load
println(dataset) //[name: string, favorite_color: string ... 1 more field]
val currentMirror = runtime.currentMirror
val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
val toolbox = ToolBox(currentMirror).mkToolBox()
val className = "App.schema.Users"
val encoderInstance = toolbox.eval(toolbox.parse(
s"""import App.spark.implicits._
|import org.apache.spark.sql.Encoder
|implicitly[Encoder[$className]]""".stripMargin))
// val encoderType = appliedType(
// typeOf[Encoder[_]].typeConstructor.typeSymbol,
// currentMirror.staticClass(className).toType
// )
// val encoderTree = toolbox.inferImplicitValue(encoderType, silent = false)
// val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
val dataset1 = constructorMirror(tableName,inputPath,spark,
saveMode,
outputPath + tableName,
metadata, encoderInstance).asInstanceOf[Load[_]].Load
println(dataset1)//[name: string, favorite_color: string ... 1 more field]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment