Last active
March 17, 2023 17:49
-
-
Save nsivabalan/17125d03e56fc5e4d72381536a8ea5ae to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.common.model.HoodieRecord | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(1000)) | |
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl1") | |
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl2") | |
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show() | |
val deletes = convertToStringList(dataGen.generateUniqueUpdates(500)) | |
val df2 = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) | |
df2.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.operation","delete"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl3") | |
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show() | |
val updates2 = convertToStringList(dataGen.generateUniqueUpdates(100)) | |
val df3 = spark.read.json(spark.sparkContext.parallelize(updates2, 2)) | |
df3.withColumn("col1",org.apache.spark.sql.functions.lit("batch4")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.clustering.inline","true"). | |
option("hoodie.clustering.inline.max.commits","2"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl4") | |
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show() | |
df2.registerTempTable("del_tbl") | |
spark.sql("select count(*) from tbl4 a join del_tbl b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 0 | |
spark.sql("select count(*) from tbl4 a join tbl3 b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 500 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.common.model.HoodieRecord | |
val tableName = "hudi_trips_cow1" | |
val basePath = "file:///tmp/hudi_trips_cow1" | |
val dataGen = new DataGenerator | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(1000)) | |
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl1") | |
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show() | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000)) | |
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl2") | |
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show() | |
val deletes = convertToStringList(dataGen.generateUniqueUpdates(500)) | |
val df2 = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) | |
df2.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.operation","delete"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl3") | |
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show() | |
val updates2 = convertToStringList(dataGen.generateUniqueUpdates(100)) | |
val df3 = spark.read.json(spark.sparkContext.parallelize(updates2, 2)) | |
df3.withColumn("col1",org.apache.spark.sql.functions.lit("batch4")). | |
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")). | |
write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.clustering.inline","true"). | |
option("hoodie.clustering.inline.max.commits","2"). | |
option("hoodie.clustering.plan.strategy.sort.columns","uuid"). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("tbl4") | |
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show() | |
df2.registerTempTable("del_tbl") | |
spark.sql("select count(*) from tbl4 a join del_tbl b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 0 | |
spark.sql("select count(*) from tbl4 a join tbl3 b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 500 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment