Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created July 30, 2023 03:55
Show Gist options
  • Save nsivabalan/253354fc0bf9cb221adfa0831125474a to your computer and use it in GitHub Desktop.
Save nsivabalan/253354fc0bf9cb221adfa0831125474a to your computer and use it in GitHub Desktop.
./bin/spark-shell --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.13.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.functions._
val tableName = "hudi_trips_bq"
val basePath = "file:///tmp/hudi_trips_bq"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.withColumn("batch_id",lit("batch1")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.partition.metafile.use.base.format","true").
option("hoodie.datasource.write.drop.partition.columns","true").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_tbl")
spark.sql("describe hudi_tbl").show(100, false)
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl").show(100, false)
// more inserts.
val inserts = convertToStringList(dataGen.generateInserts(5))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
df.withColumn("batch_id",lit("batch2")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.partition.metafile.use.base.format","true").
option("hoodie.datasource.write.drop.partition.columns","true").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// 10 in batch1, 5 in batch2
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_tbl1")
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl1 order by batch_id").show(100, false)
// count records per batch/commit
spark.sql("select batch_id, count(*) from hudi_tbl1 group by 1").show(100, false)
// we can verify that 5 of them belong to batch2
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl1 group by 1,2 ").show(100, false)
// update records
val updates = convertToStringList(dataGen.generateUniqueUpdates(4))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
// printing the records to be upserted.
df.select("partitionPath","uuid","ts","rider","fare").show(5, false)
df.withColumn("batch_id",lit("batch3")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.partition.metafile.use.base.format","true").
option("hoodie.datasource.write.drop.partition.columns","true").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_tbl2")
// we can see that only 4 records have batch_id = "batch3". we can also verify w/ uuid shown above. check for rider value or batch_id value or fare value to validate the updates.
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl2 order by batch_id").show(100, false)
// count records per batch/commit
spark.sql("select batch_id, count(*) from hudi_tbl2 group by 1").show(100, false)
// we can verify that 11 of them belong to batch1 and batch2(combined) and 4 belong to batch3(updates).
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl2 group by 1,2 ").show(100, false)
// deletes
val deletes = convertToStringList(dataGen.generateUniqueUpdates(2))
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 1))
// printing the records to be deleted.
df.select("partitionPath","uuid","ts","rider","fare").show(5, false)
val toDeleteDf = df
toDeleteDf.createOrReplaceTempView("toDelete_tbl")
// count total records before delete
spark.sql("select count(*) from hudi_tbl2").show(false)
df.withColumn("batch_id",lit("batch4")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.partition.metafile.use.base.format","true").
option("hoodie.datasource.write.drop.partition.columns","true").
option("hoodie.datasource.write.operation","delete").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_tbl3")
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl3 order by batch_id").show(100, false)
// ensure total count reduced by 2
spark.sql("select count(*) from hudi_tbl3").show(false)
// count records per batch/commit
spark.sql("select batch_id, count(*) from hudi_tbl3 group by 1").show(100, false)
// we can verify that there is none with batchId = batch4 (since batch 4 records are deleted).
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl3 group by 1,2 ").show(100, false)
// verify that we don't find any records from to_delete df
spark.sql("select count(*) from hudi_tbl3 where uuid in (select uuid from toDelete_tbl)").show()
// soft deletes
val softDeleteDs = spark.sql("select * from hudi_tbl3").limit(2)
// prepare the soft deletes by ensuring the appropriate fields are nullified
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))
val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
// verify that except partitionpath, uuid and ts every other columns is nullified.
softDeleteDf.show(2, false)
softDeleteDf.createOrReplaceTempView("softDelete_tbl")
// count the total records before soft deletes
spark.sql("select count(*) from hudi_tbl3").show()
// simply upsert the table after setting these fields to null
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.partition.metafile.use.base.format","true").
option("hoodie.datasource.write.drop.partition.columns","true").
option("hoodie.datasource.write.operation","delete").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_tbl4")
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl4 order by batch_id").show(100, false)
// count records per batch/commit
spark.sql("select batch_id, count(*) from hudi_tbl4 group by 1").show(100, false)
// we can verify that there is none with batchId = batch4 (since batch 4 records are deleted).
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl4 group by 1,2 ").show(100, false)
// verify that total count is reduced by 2 compared to the total record count we found before
spark.sql("select count(*) from hudi_tbl4").show()
// verify that we don't find any records from softDelete_tbl
spark.sql("select count(*) from hudi_tbl4 where uuid in (select uuid from softDelete_tbl)").show()
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
scala> import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala>
scala> val tableName = "hudi_trips_bq"
tableName: String = hudi_trips_bq
scala> val basePath = "file:///tmp/hudi_trips_bq"
basePath: String = file:///tmp/hudi_trips_bq
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@49ce0667
scala>
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
inserts: java.util.List[String] = [{"ts": 1690177939963, "uuid": "9d03d7d4-ed08-4189-a8aa-39c49415d8cb", "rider": "rider-668", "driver": "driver-668", "begin_lat": 0.13625652434397972, "begin_lon": 0.621688297381891, "end_lat": 0.7970646814292182, "end_lon": 0.3852529405012508, "fare": 69.36363684236434, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690609944610, "uuid": "c025d239-70b0-4a22-ac0b-a9502c83cd61", "rider": "rider-668", "driver": "driver-668", "begin_lat": 0.9115819084017496, "begin_lon": 0.8414360533180016, "end_lat": 0.6105448818666265, "end_lon": 0.8006778279930239, "fare": 62.79408654844148, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 169...
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala> df.withColumn("batch_id",lit("batch1")).write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.write.hive_style_partitioning","true").
| option("hoodie.partition.metafile.use.base.format","true").
| option("hoodie.datasource.write.drop.partition.columns","true").
| option(TABLE_NAME, tableName).
| mode(Overwrite).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
23/07/29 20:52:31 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_bq already exists. Deleting existing data & overwriting with new data.
23/07/29 20:52:31 WARN HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_bq/.hoodie/metadata
scala>
scala>
scala> spark.
| read.
| format("hudi").
| load(basePath).createOrReplaceTempView("hudi_tbl")
scala>
scala> spark.sql("describe hudi_tbl").show(100, false)
+----------------------+---------+-------+
|col_name |data_type|comment|
+----------------------+---------+-------+
|_hoodie_commit_time |string |null |
|_hoodie_commit_seqno |string |null |
|_hoodie_record_key |string |null |
|_hoodie_partition_path|string |null |
|_hoodie_file_name |string |null |
|begin_lat |double |null |
|begin_lon |double |null |
|driver |string |null |
|end_lat |double |null |
|end_lon |double |null |
|fare |double |null |
|rider |string |null |
|ts |bigint |null |
|uuid |string |null |
|batch_id |string |null |
|partitionpath |string |null |
+----------------------+---------+-------+
scala>
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl").show(100, false)
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|partitionpath |uuid |batch_id|rider |ts |fare |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch1 |rider-668|1690299840167|41.57780462795554 |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch1 |rider-668|1690177939963|69.36363684236434 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch1 |rider-668|1690536465531|94.67524853429937 |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484|
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
scala>
scala>
scala> // more inserts.
scala>
scala> val inserts = convertToStringList(dataGen.generateInserts(5))
inserts: java.util.List[String] = [{"ts": 1690350229349, "uuid": "de84a55a-5761-4fc3-9088-3f930e88da03", "rider": "rider-048", "driver": "driver-048", "begin_lat": 0.7004607721204296, "begin_lon": 0.9911546157239198, "end_lat": 0.14602911545960373, "end_lon": 0.430070203188727, "fare": 89.89115113672493, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1690305541919, "uuid": "48170c31-f785-4685-a586-07a80aa3656e", "rider": "rider-048", "driver": "driver-048", "begin_lat": 0.011933855867048981, "begin_lon": 0.16258177392270334, "end_lat": 0.9635314017496284, "end_lon": 0.6451866124948767, "fare": 69.09535493302582, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1690163733693, "uuid":...
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala>
scala> df.withColumn("batch_id",lit("batch2")).write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.write.hive_style_partitioning","true").
| option("hoodie.partition.metafile.use.base.format","true").
| option("hoodie.datasource.write.drop.partition.columns","true").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
scala>
scala> // 10 in batch1, 5 in batch2
scala>
scala> spark.
| read.
| format("hudi").
| load(basePath).createOrReplaceTempView("hudi_tbl1")
scala>
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl1 order by batch_id").show(100, false)
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|partitionpath |uuid |batch_id|rider |ts |fare |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch1 |rider-668|1690177939963|69.36363684236434 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch1 |rider-668|1690536465531|94.67524853429937 |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch1 |rider-668|1690299840167|41.57780462795554 |
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484|
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945|
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch2 |rider-048|1690163733693|17.63327305991089 |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
scala>
scala> // count records per batch/commit
scala> spark.sql("select batch_id, count(*) from hudi_tbl1 group by 1").show(100, false)
+--------+--------+
|batch_id|count(1)|
+--------+--------+
|batch1 |10 |
|batch2 |5 |
+--------+--------+
scala>
scala> // we can verify that 5 of them belong to batch2
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl1 group by 1,2 ").show(100, false)
+-------------------+--------+--------+
|_hoodie_commit_time|batch_id|count(1)|
+-------------------+--------+--------+
|20230729205231238 |batch1 |10 |
|20230729205235432 |batch2 |5 |
+-------------------+--------+--------+
scala>
scala>
scala> // update records
scala> val updates = convertToStringList(dataGen.generateUniqueUpdates(4))
updates: java.util.List[String] = [{"ts": 1690655608668, "uuid": "1c6f9211-81c9-4fd5-b10c-c07b591dae7b", "rider": "rider-837", "driver": "driver-837", "begin_lat": 0.3533375512548106, "begin_lon": 0.25216729525590675, "end_lat": 0.6372033416112669, "end_lon": 0.5844540681855337, "fare": 80.517971387309, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690178136769, "uuid": "4409ddd3-c79b-4587-865a-c534c15ea56b", "rider": "rider-837", "driver": "driver-837", "begin_lat": 0.7230615673273981, "begin_lon": 0.3608052937112112, "end_lat": 0.8250750215077437, "end_lon": 0.030241465200331774, "fare": 74.7341018556108, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_f...
scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala>
scala> // printing the records to be upserted.
scala> df.select("partitionPath","uuid","ts","rider","fare").show(5, false)
+------------------------------------+------------------------------------+-------------+---------+-----------------+
|partitionPath |uuid |ts |rider |fare |
+------------------------------------+------------------------------------+-------------+---------+-----------------+
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|1690655608668|rider-837|80.517971387309 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|1690178136769|rider-837|74.7341018556108 |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|1690309939155|rider-837|16.13290418880422|
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|1690236576603|rider-837|6.860897540622235|
+------------------------------------+------------------------------------+-------------+---------+-----------------+
scala>
scala> df.withColumn("batch_id",lit("batch3")).write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.write.hive_style_partitioning","true").
| option("hoodie.partition.metafile.use.base.format","true").
| option("hoodie.datasource.write.drop.partition.columns","true").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
scala>
scala>
scala> spark.
| read.
| format("hudi").
| load(basePath).createOrReplaceTempView("hudi_tbl2")
scala>
scala> // we can see that only 4 records have batch_id = "batch3". we can also verify w/ uuid shown above. check for rider value or batch_id value or fare value to validate the updates.
scala>
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl2 order by batch_id").show(100, false)
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|partitionpath |uuid |batch_id|rider |ts |fare |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484|
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945|
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch3 |rider-837|1690309939155|16.13290418880422 |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
scala>
scala> // count records per batch/commit
scala> spark.sql("select batch_id, count(*) from hudi_tbl2 group by 1").show(100, false)
+--------+--------+
|batch_id|count(1)|
+--------+--------+
|batch1 |7 |
|batch2 |4 |
|batch3 |4 |
+--------+--------+
scala>
scala> // we can verify that 11 of them belong to batch1 and batch2(combined) and 4 belong to batch3(updates).
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl2 group by 1,2 ").show(100, false)
+-------------------+--------+--------+
|_hoodie_commit_time|batch_id|count(1)|
+-------------------+--------+--------+
|20230729205240123 |batch3 |4 |
|20230729205231238 |batch1 |7 |
|20230729205235432 |batch2 |4 |
+-------------------+--------+--------+
scala>
scala>
scala>
scala>
scala> // deletes
scala>
scala> val deletes = convertToStringList(dataGen.generateUniqueUpdates(2))
deletes: java.util.List[String] = [{"ts": 1690586933363, "uuid": "9d03d7d4-ed08-4189-a8aa-39c49415d8cb", "rider": "rider-707", "driver": "driver-707", "begin_lat": 0.8183759067610148, "begin_lon": 0.5936482561544364, "end_lat": 0.19655483974779764, "end_lon": 0.29594364305313803, "fare": 55.51725492240046, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690135701138, "uuid": "ab32477b-401f-4659-9f3d-93a211ccc067", "rider": "rider-707", "driver": "driver-707", "begin_lat": 0.5488633090101539, "begin_lon": 0.9785314544611191, "end_lat": 0.13644545265446528, "end_lon": 0.7338703063422535, "fare": 8.046423637086441, "partitionpath": "asia/india/chennai"}, "partitionpath": "asia/india/chennai"}]
scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 1))
warning: there was one deprecation warning; re-run with -deprecation for details
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala>
scala> // printing the records to be deleted.
scala> df.select("partitionPath","uuid","ts","rider","fare").show(5, false)
+------------------------------------+------------------------------------+-------------+---------+-----------------+
|partitionPath |uuid |ts |rider |fare |
+------------------------------------+------------------------------------+-------------+---------+-----------------+
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|1690586933363|rider-707|55.51725492240046|
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|1690135701138|rider-707|8.046423637086441|
+------------------------------------+------------------------------------+-------------+---------+-----------------+
scala> val toDeleteDf = df
toDeleteDf: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
scala> toDeleteDf.createOrReplaceTempView("toDelete_tbl")
scala>
scala> // count total records before delete
scala>
scala> spark.sql("select count(*) from hudi_tbl2").show(false)
+--------+
|count(1)|
+--------+
|15 |
+--------+
scala>
scala> df.withColumn("batch_id",lit("batch4")).write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.write.hive_style_partitioning","true").
| option("hoodie.partition.metafile.use.base.format","true").
| option("hoodie.datasource.write.drop.partition.columns","true").
| option("hoodie.datasource.write.operation","delete").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
scala>
scala>
scala> spark.
| read.
| format("hudi").
| load(basePath).createOrReplaceTempView("hudi_tbl3")
scala>
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl3 order by batch_id").show(100, false)
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|partitionpath |uuid |batch_id|rider |ts |fare |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484|
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945|
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
scala>
scala> // ensure total count reduced by 2
scala> spark.sql("select count(*) from hudi_tbl3").show(false)
+--------+
|count(1)|
+--------+
|13 |
+--------+
scala>
scala> // count records per batch/commit
scala> spark.sql("select batch_id, count(*) from hudi_tbl3 group by 1").show(100, false)
+--------+--------+
|batch_id|count(1)|
+--------+--------+
|batch1 |6 |
|batch2 |4 |
|batch3 |3 |
+--------+--------+
scala>
scala> // we can verify that there is none with batchId = batch4 (since batch 4 records are deleted).
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl3 group by 1,2 ").show(100, false)
+-------------------+--------+--------+
|_hoodie_commit_time|batch_id|count(1)|
+-------------------+--------+--------+
|20230729205240123 |batch3 |3 |
|20230729205231238 |batch1 |6 |
|20230729205235432 |batch2 |4 |
+-------------------+--------+--------+
scala>
scala> // verify that we don't find any records from to_delete df
scala> spark.sql("select count(*) from hudi_tbl3 where uuid in (select uuid from toDelete_tbl)").show()
+--------+
|count(1)|
+--------+
| 0|
+--------+
scala>
scala>
scala>
scala>
scala>
scala>
scala>
scala> // soft deletes
scala> val softDeleteDs = spark.sql("select * from hudi_tbl3").limit(2)
softDeleteDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields]
scala>
scala> // prepare the soft deletes by ensuring the appropriate fields are nullified
scala> val nullifyColumns = softDeleteDs.schema.fields.
| map(field => (field.name, field.dataType.typeName)).
| filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
| && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
nullifyColumns: Array[(String, String)] = Array((begin_lat,double), (begin_lon,double), (driver,string), (end_lat,double), (end_lon,double), (fare,double), (rider,string), (batch_id,string))
scala>
scala> val softDeleteDf = nullifyColumns.
| foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
| (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
softDeleteDf: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields]
scala>
scala> // verify that except partitionpath, uuid and ts every other columns is nullified.
scala> softDeleteDf.show(2, false)
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+
|begin_lat|begin_lon|driver|end_lat|end_lon|fare|rider|ts |uuid |batch_id|partitionpath |
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+
|null |null |null |null |null |null|null |1690235686261|104c4af1-74ec-40d5-af58-c59edf06925c|null |americas/brazil/sao_paulo|
|null |null |null |null |null |null|null |1690609944610|c025d239-70b0-4a22-ac0b-a9502c83cd61|null |americas/brazil/sao_paulo|
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+
scala>
scala> softDeleteDf.createOrReplaceTempView("softDelete_tbl")
scala>
scala> // count the total records before soft deletes
scala>
scala> spark.sql("select count(*) from hudi_tbl3").show()
+--------+
|count(1)|
+--------+
| 13|
+--------+
scala>
scala> // simply upsert the table after setting these fields to null
scala> softDeleteDf.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.write.hive_style_partitioning","true").
| option("hoodie.partition.metafile.use.base.format","true").
| option("hoodie.datasource.write.drop.partition.columns","true").
| option("hoodie.datasource.write.operation","delete").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
scala>
scala>
scala> spark.
| read.
| format("hudi").
| load(basePath).createOrReplaceTempView("hudi_tbl4")
scala>
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl4 order by batch_id").show(100, false)
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|partitionpath |uuid |batch_id|rider |ts |fare |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484|
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945|
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+
scala>
scala> // count records per batch/commit
scala> spark.sql("select batch_id, count(*) from hudi_tbl4 group by 1").show(100, false)
+--------+--------+
|batch_id|count(1)|
+--------+--------+
|batch1 |4 |
|batch2 |4 |
|batch3 |3 |
+--------+--------+
scala>
scala> // we can verify that there is none with batchId = batch4 (since batch 4 records are deleted).
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl4 group by 1,2 ").show(100, false)
+-------------------+--------+--------+
|_hoodie_commit_time|batch_id|count(1)|
+-------------------+--------+--------+
|20230729205240123 |batch3 |3 |
|20230729205231238 |batch1 |4 |
|20230729205235432 |batch2 |4 |
+-------------------+--------+--------+
scala>
scala> // verify that total count is reduced by 2 compared to the total record count we found before
scala> spark.sql("select count(*) from hudi_tbl4").show()
+--------+
|count(1)|
+--------+
| 11|
+--------+
scala>
scala>
scala> // verify that we don't find any records from softDelete_tbl
scala> spark.sql("select count(*) from hudi_tbl4 where uuid in (select uuid from softDelete_tbl)").show()
+--------+
|count(1)|
+--------+
| 0|
+--------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment