Created
March 18, 2023 05:34
-
-
Save nsivabalan/8ad64b0b5e0258cbfd114d1801ebc38e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.common.model.HoodieRecord | |
val tableName = "hudi_trips_mor_global" | |
val basePath = "file:///tmp/hudi_trips_mor_global" | |
val dataGen = new DataGenerator | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(1000)) | |
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "col1"). | |
option("hoodie.metadata.enable","false"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl1") | |
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "col1"). | |
option("hoodie.metadata.enable","false"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option("hoodie.index.type","GLOBAL_SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl2") | |
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "col1"). | |
option("hoodie.metadata.enable","false"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option("hoodie.index.type","GLOBAL_SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl3") | |
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "col1"). | |
option("hoodie.metadata.enable","false"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option("hoodie.index.type","GLOBAL_SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl4") | |
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show() | |
spark.sql("select * from (select uuid, count(*) as count from tbl4 group by 1) where count > 1").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "col1"). | |
option("hoodie.metadata.enable","false"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option("hoodie.index.type","GLOBAL_SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl5") | |
spark.sql("select col1, count(*) from tbl5 group by 1 order by 2 desc").show() | |
spark.sql("select * from (select uuid, count(*) as count from tbl5 group by 1) where count > 1").show() | |
spark.sql("select count(*) from (select uuid, count(*) as count from tbl5 group by 1) where count > 1").show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment