Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active March 17, 2023 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/17125d03e56fc5e4d72381536a8ea5ae to your computer and use it in GitHub Desktop.
Save nsivabalan/17125d03e56fc5e4d72381536a8ea5ae to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(1000))
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl1")
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl2")
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show()
val deletes = convertToStringList(dataGen.generateUniqueUpdates(500))
val df2 = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
df2.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.operation","delete").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl3")
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show()
val updates2 = convertToStringList(dataGen.generateUniqueUpdates(100))
val df3 = spark.read.json(spark.sparkContext.parallelize(updates2, 2))
df3.withColumn("col1",org.apache.spark.sql.functions.lit("batch4")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.clustering.inline","true").
option("hoodie.clustering.inline.max.commits","2").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl4")
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show()
df2.registerTempTable("del_tbl")
spark.sql("select count(*) from tbl4 a join del_tbl b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 0
spark.sql("select count(*) from tbl4 a join tbl3 b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 500
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow1"
val basePath = "file:///tmp/hudi_trips_cow1"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(1000))
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl1")
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl2")
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show()
val deletes = convertToStringList(dataGen.generateUniqueUpdates(500))
val df2 = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
df2.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.operation","delete").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl3")
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show()
val updates2 = convertToStringList(dataGen.generateUniqueUpdates(100))
val df3 = spark.read.json(spark.sparkContext.parallelize(updates2, 2))
df3.withColumn("col1",org.apache.spark.sql.functions.lit("batch4")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.clustering.inline","true").
option("hoodie.clustering.inline.max.commits","2").
option("hoodie.clustering.plan.strategy.sort.columns","uuid").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl4")
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show()
df2.registerTempTable("del_tbl")
spark.sql("select count(*) from tbl4 a join del_tbl b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 0
spark.sql("select count(*) from tbl4 a join tbl3 b on a.partitionpath = b.partitionpath and a.uuid = b.uuid").show() // expect 500
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment