Skip to content

Instantly share code, notes, and snippets.

@akhld
Created October 28, 2016 04:09
Show Gist options
  • Save akhld/dfef4df88c9f37ec8abe8c0971270570 to your computer and use it in GitHub Desktop.
Save akhld/dfef4df88c9f37ec8abe8c0971270570 to your computer and use it in GitHub Desktop.
Reading multiple parquets, partitioning by columns and appending to table
val storage = "hdfs://nameservice1/user/plutus/data/kmeans_prediction_par_"
val penInputs = (1 to 30).map(x =>{
val date = DateTime.now().minusDays(x).toString("yyyy-MM-dd")
(date, storage + date)
}).filter(prediction_storage => {
HdfsTools.checkIfFolderExists(new Path(prediction_storage._2))
})
penInputs.foreach(println)
val parquets = penInputs.map(path =>{
(path._1, hiveContext.read.parquet(path._2))
})
val dfs = parquets.map(p => {
p._2.withColumn("datadate", lit(p._1))
})
dfs(0).take(10).foreach(println)
dfs(0).printSchema()
dfs.foreach(df => {
df.insertIntoTable(
table = "plutus.pen_data",
overwrite = true,
partitionBy = Seq("datadate"),
createTable = true,
tableStore = ParquetStore(),
distributeBy = NoDistributeBy
)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment