Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created March 18, 2023 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/8ad64b0b5e0258cbfd114d1801ebc38e to your computer and use it in GitHub Desktop.
Save nsivabalan/8ad64b0b5e0258cbfd114d1801ebc38e to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_mor_global"
val basePath = "file:///tmp/hudi_trips_mor_global"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(1000))
val df0 = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df0.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "col1").
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl1")
spark.sql("select col1, count(*) from tbl1 group by 1 order by 2 desc").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch1")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "col1").
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.index.type","GLOBAL_SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl2")
spark.sql("select col1, count(*) from tbl2 group by 1 order by 2 desc").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "col1").
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.index.type","GLOBAL_SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl3")
spark.sql("select col1, count(*) from tbl3 group by 1 order by 2 desc").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch2")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "col1").
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.index.type","GLOBAL_SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl4")
spark.sql("select col1, count(*) from tbl4 group by 1 order by 2 desc").show()
spark.sql("select * from (select uuid, count(*) as count from tbl4 group by 1) where count > 1").show()
val updates = convertToStringList(dataGen.generateUniqueUpdates(1000))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df1.withColumn("col1",org.apache.spark.sql.functions.lit("batch3")).
withColumn("partitionpath",org.apache.spark.sql.functions.lit("path1")).
write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "col1").
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.index.type","GLOBAL_SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("tbl5")
spark.sql("select col1, count(*) from tbl5 group by 1 order by 2 desc").show()
spark.sql("select * from (select uuid, count(*) as count from tbl5 group by 1) where count > 1").show()
spark.sql("select count(*) from (select uuid, count(*) as count from tbl5 group by 1) where count > 1").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment