Created
July 30, 2023 03:55
-
-
Save nsivabalan/253354fc0bf9cb221adfa0831125474a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./bin/spark-shell --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.13.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.common.model.HoodieRecord | |
import org.apache.spark.sql.functions._ | |
val tableName = "hudi_trips_bq" | |
val basePath = "file:///tmp/hudi_trips_bq" | |
val dataGen = new DataGenerator | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.withColumn("batch_id",lit("batch1")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.hive_style_partitioning","true"). | |
option("hoodie.partition.metafile.use.base.format","true"). | |
option("hoodie.datasource.write.drop.partition.columns","true"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("hudi_tbl") | |
spark.sql("describe hudi_tbl").show(100, false) | |
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl").show(100, false) | |
// more inserts. | |
val inserts = convertToStringList(dataGen.generateInserts(5)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) | |
df.withColumn("batch_id",lit("batch2")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.hive_style_partitioning","true"). | |
option("hoodie.partition.metafile.use.base.format","true"). | |
option("hoodie.datasource.write.drop.partition.columns","true"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
// 10 in batch1, 5 in batch2 | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("hudi_tbl1") | |
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl1 order by batch_id").show(100, false) | |
// count records per batch/commit | |
spark.sql("select batch_id, count(*) from hudi_tbl1 group by 1").show(100, false) | |
// we can verify that 5 of them belong to batch2 | |
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl1 group by 1,2 ").show(100, false) | |
// update records | |
val updates = convertToStringList(dataGen.generateUniqueUpdates(4)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) | |
// printing the records to be upserted. | |
df.select("partitionPath","uuid","ts","rider","fare").show(5, false) | |
df.withColumn("batch_id",lit("batch3")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.hive_style_partitioning","true"). | |
option("hoodie.partition.metafile.use.base.format","true"). | |
option("hoodie.datasource.write.drop.partition.columns","true"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("hudi_tbl2") | |
// we can see that only 4 records have batch_id = "batch3". we can also verify w/ uuid shown above. check for rider value or batch_id value or fare value to validate the updates. | |
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl2 order by batch_id").show(100, false) | |
// count records per batch/commit | |
spark.sql("select batch_id, count(*) from hudi_tbl2 group by 1").show(100, false) | |
// we can verify that 11 of them belong to batch1 and batch2(combined) and 4 belong to batch3(updates). | |
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl2 group by 1,2 ").show(100, false) | |
// deletes | |
val deletes = convertToStringList(dataGen.generateUniqueUpdates(2)) | |
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 1)) | |
// printing the records to be deleted. | |
df.select("partitionPath","uuid","ts","rider","fare").show(5, false) | |
val toDeleteDf = df | |
toDeleteDf.createOrReplaceTempView("toDelete_tbl") | |
// count total records before delete | |
spark.sql("select count(*) from hudi_tbl2").show(false) | |
df.withColumn("batch_id",lit("batch4")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.hive_style_partitioning","true"). | |
option("hoodie.partition.metafile.use.base.format","true"). | |
option("hoodie.datasource.write.drop.partition.columns","true"). | |
option("hoodie.datasource.write.operation","delete"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("hudi_tbl3") | |
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl3 order by batch_id").show(100, false) | |
// ensure total count reduced by 2 | |
spark.sql("select count(*) from hudi_tbl3").show(false) | |
// count records per batch/commit | |
spark.sql("select batch_id, count(*) from hudi_tbl3 group by 1").show(100, false) | |
// we can verify that there is none with batchId = batch4 (since batch 4 records are deleted). | |
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl3 group by 1,2 ").show(100, false) | |
// verify that we don't find any records from to_delete df | |
spark.sql("select count(*) from hudi_tbl3 where uuid in (select uuid from toDelete_tbl)").show() | |
// soft deletes | |
val softDeleteDs = spark.sql("select * from hudi_tbl3").limit(2) | |
// prepare the soft deletes by ensuring the appropriate fields are nullified | |
val nullifyColumns = softDeleteDs.schema.fields. | |
map(field => (field.name, field.dataType.typeName)). | |
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) | |
&& !Array("ts", "uuid", "partitionpath").contains(pair._1))) | |
val softDeleteDf = nullifyColumns. | |
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( | |
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) | |
// verify that except partitionpath, uuid and ts every other columns is nullified. | |
softDeleteDf.show(2, false) | |
softDeleteDf.createOrReplaceTempView("softDelete_tbl") | |
// count the total records before soft deletes | |
spark.sql("select count(*) from hudi_tbl3").show() | |
// simply upsert the table after setting these fields to null | |
softDeleteDf.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.write.hive_style_partitioning","true"). | |
option("hoodie.partition.metafile.use.base.format","true"). | |
option("hoodie.datasource.write.drop.partition.columns","true"). | |
option("hoodie.datasource.write.operation","delete"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
spark. | |
read. | |
format("hudi"). | |
load(basePath).createOrReplaceTempView("hudi_tbl4") | |
spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl4 order by batch_id").show(100, false) | |
// count records per batch/commit | |
spark.sql("select batch_id, count(*) from hudi_tbl4 group by 1").show(100, false) | |
// we can verify that there is none with batchId = batch4 (since batch 4 records are deleted). | |
spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl4 group by 1,2 ").show(100, false) | |
// verify that total count is reduced by 2 compared to the total record count we found before | |
spark.sql("select count(*) from hudi_tbl4").show() | |
// verify that we don't find any records from softDelete_tbl | |
spark.sql("select count(*) from hudi_tbl4 where uuid in (select uuid from softDelete_tbl)").show() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala> import org.apache.hudi.QuickstartUtils._ | |
import org.apache.hudi.QuickstartUtils._ | |
scala> import scala.collection.JavaConversions._ | |
import scala.collection.JavaConversions._ | |
scala> import org.apache.spark.sql.SaveMode._ | |
import org.apache.spark.sql.SaveMode._ | |
scala> import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
scala> import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
scala> import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
scala> import org.apache.hudi.common.model.HoodieRecord | |
import org.apache.hudi.common.model.HoodieRecord | |
scala> import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.functions._ | |
scala> | |
scala> val tableName = "hudi_trips_bq" | |
tableName: String = hudi_trips_bq | |
scala> val basePath = "file:///tmp/hudi_trips_bq" | |
basePath: String = file:///tmp/hudi_trips_bq | |
scala> val dataGen = new DataGenerator | |
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@49ce0667 | |
scala> | |
scala> val inserts = convertToStringList(dataGen.generateInserts(10)) | |
inserts: java.util.List[String] = [{"ts": 1690177939963, "uuid": "9d03d7d4-ed08-4189-a8aa-39c49415d8cb", "rider": "rider-668", "driver": "driver-668", "begin_lat": 0.13625652434397972, "begin_lon": 0.621688297381891, "end_lat": 0.7970646814292182, "end_lon": 0.3852529405012508, "fare": 69.36363684236434, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690609944610, "uuid": "c025d239-70b0-4a22-ac0b-a9502c83cd61", "rider": "rider-668", "driver": "driver-668", "begin_lat": 0.9115819084017496, "begin_lon": 0.8414360533180016, "end_lat": 0.6105448818666265, "end_lon": 0.8006778279930239, "fare": 62.79408654844148, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 169... | |
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
scala> df.withColumn("batch_id",lit("batch1")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.write.hive_style_partitioning","true"). | |
| option("hoodie.partition.metafile.use.base.format","true"). | |
| option("hoodie.datasource.write.drop.partition.columns","true"). | |
| option(TABLE_NAME, tableName). | |
| mode(Overwrite). | |
| save(basePath) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
23/07/29 20:52:31 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_bq already exists. Deleting existing data & overwriting with new data. | |
23/07/29 20:52:31 WARN HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_bq/.hoodie/metadata | |
scala> | |
scala> | |
scala> spark. | |
| read. | |
| format("hudi"). | |
| load(basePath).createOrReplaceTempView("hudi_tbl") | |
scala> | |
scala> spark.sql("describe hudi_tbl").show(100, false) | |
+----------------------+---------+-------+ | |
|col_name |data_type|comment| | |
+----------------------+---------+-------+ | |
|_hoodie_commit_time |string |null | | |
|_hoodie_commit_seqno |string |null | | |
|_hoodie_record_key |string |null | | |
|_hoodie_partition_path|string |null | | |
|_hoodie_file_name |string |null | | |
|begin_lat |double |null | | |
|begin_lon |double |null | | |
|driver |string |null | | |
|end_lat |double |null | | |
|end_lon |double |null | | |
|fare |double |null | | |
|rider |string |null | | |
|ts |bigint |null | | |
|uuid |string |null | | |
|batch_id |string |null | | |
|partitionpath |string |null | | |
+----------------------+---------+-------+ | |
scala> | |
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl").show(100, false) | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|partitionpath |uuid |batch_id|rider |ts |fare | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 | | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch1 |rider-668|1690299840167|41.57780462795554 | | |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 | | |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 | | |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch1 |rider-668|1690177939963|69.36363684236434 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch1 |rider-668|1690536465531|94.67524853429937 | | |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 | | |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 | | |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484| | |
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
scala> | |
scala> | |
scala> // more inserts. | |
scala> | |
scala> val inserts = convertToStringList(dataGen.generateInserts(5)) | |
inserts: java.util.List[String] = [{"ts": 1690350229349, "uuid": "de84a55a-5761-4fc3-9088-3f930e88da03", "rider": "rider-048", "driver": "driver-048", "begin_lat": 0.7004607721204296, "begin_lon": 0.9911546157239198, "end_lat": 0.14602911545960373, "end_lon": 0.430070203188727, "fare": 89.89115113672493, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1690305541919, "uuid": "48170c31-f785-4685-a586-07a80aa3656e", "rider": "rider-048", "driver": "driver-048", "begin_lat": 0.011933855867048981, "begin_lon": 0.16258177392270334, "end_lat": 0.9635314017496284, "end_lon": 0.6451866124948767, "fare": 69.09535493302582, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1690163733693, "uuid":... | |
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
scala> | |
scala> df.withColumn("batch_id",lit("batch2")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.write.hive_style_partitioning","true"). | |
| option("hoodie.partition.metafile.use.base.format","true"). | |
| option("hoodie.datasource.write.drop.partition.columns","true"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
scala> | |
scala> // 10 in batch1, 5 in batch2 | |
scala> | |
scala> spark. | |
| read. | |
| format("hudi"). | |
| load(basePath).createOrReplaceTempView("hudi_tbl1") | |
scala> | |
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl1 order by batch_id").show(100, false) | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|partitionpath |uuid |batch_id|rider |ts |fare | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 | | |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 | | |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch1 |rider-668|1690177939963|69.36363684236434 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch1 |rider-668|1690536465531|94.67524853429937 | | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch1 |rider-668|1690299840167|41.57780462795554 | | |
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 | | |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 | | |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 | | |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 | | |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484| | |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945| | |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 | | |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 | | |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch2 |rider-048|1690163733693|17.63327305991089 | | |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
scala> | |
scala> // count records per batch/commit | |
scala> spark.sql("select batch_id, count(*) from hudi_tbl1 group by 1").show(100, false) | |
+--------+--------+ | |
|batch_id|count(1)| | |
+--------+--------+ | |
|batch1 |10 | | |
|batch2 |5 | | |
+--------+--------+ | |
scala> | |
scala> // we can verify that 5 of them belong to batch2 | |
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl1 group by 1,2 ").show(100, false) | |
+-------------------+--------+--------+ | |
|_hoodie_commit_time|batch_id|count(1)| | |
+-------------------+--------+--------+ | |
|20230729205231238 |batch1 |10 | | |
|20230729205235432 |batch2 |5 | | |
+-------------------+--------+--------+ | |
scala> | |
scala> | |
scala> // update records | |
scala> val updates = convertToStringList(dataGen.generateUniqueUpdates(4)) | |
updates: java.util.List[String] = [{"ts": 1690655608668, "uuid": "1c6f9211-81c9-4fd5-b10c-c07b591dae7b", "rider": "rider-837", "driver": "driver-837", "begin_lat": 0.3533375512548106, "begin_lon": 0.25216729525590675, "end_lat": 0.6372033416112669, "end_lon": 0.5844540681855337, "fare": 80.517971387309, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690178136769, "uuid": "4409ddd3-c79b-4587-865a-c534c15ea56b", "rider": "rider-837", "driver": "driver-837", "begin_lat": 0.7230615673273981, "begin_lon": 0.3608052937112112, "end_lat": 0.8250750215077437, "end_lon": 0.030241465200331774, "fare": 74.7341018556108, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_f... | |
scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
scala> | |
scala> // printing the records to be upserted. | |
scala> df.select("partitionPath","uuid","ts","rider","fare").show(5, false) | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
|partitionPath |uuid |ts |rider |fare | | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|1690655608668|rider-837|80.517971387309 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|1690178136769|rider-837|74.7341018556108 | | |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|1690309939155|rider-837|16.13290418880422| | |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|1690236576603|rider-837|6.860897540622235| | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
scala> | |
scala> df.withColumn("batch_id",lit("batch3")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.write.hive_style_partitioning","true"). | |
| option("hoodie.partition.metafile.use.base.format","true"). | |
| option("hoodie.datasource.write.drop.partition.columns","true"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
scala> | |
scala> | |
scala> spark. | |
| read. | |
| format("hudi"). | |
| load(basePath).createOrReplaceTempView("hudi_tbl2") | |
scala> | |
scala> // we can see that only 4 records have batch_id = "batch3". we can also verify w/ uuid shown above. check for rider value or batch_id value or fare value to validate the updates. | |
scala> | |
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl2 order by batch_id").show(100, false) | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|partitionpath |uuid |batch_id|rider |ts |fare | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|batch1 |rider-668|1690301176702|91.77262409248691 | | |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 | | |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 | | |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484| | |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 | | |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 | | |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 | | |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 | | |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 | | |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945| | |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 | | |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 | | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 | | |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|batch3 |rider-837|1690309939155|16.13290418880422 | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
scala> | |
scala> // count records per batch/commit | |
scala> spark.sql("select batch_id, count(*) from hudi_tbl2 group by 1").show(100, false) | |
+--------+--------+ | |
|batch_id|count(1)| | |
+--------+--------+ | |
|batch1 |7 | | |
|batch2 |4 | | |
|batch3 |4 | | |
+--------+--------+ | |
scala> | |
scala> // we can verify that 11 of them belong to batch1 and batch2(combined) and 4 belong to batch3(updates). | |
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl2 group by 1,2 ").show(100, false) | |
+-------------------+--------+--------+ | |
|_hoodie_commit_time|batch_id|count(1)| | |
+-------------------+--------+--------+ | |
|20230729205240123 |batch3 |4 | | |
|20230729205231238 |batch1 |7 | | |
|20230729205235432 |batch2 |4 | | |
+-------------------+--------+--------+ | |
scala> | |
scala> | |
scala> | |
scala> | |
scala> // deletes | |
scala> | |
scala> val deletes = convertToStringList(dataGen.generateUniqueUpdates(2)) | |
deletes: java.util.List[String] = [{"ts": 1690586933363, "uuid": "9d03d7d4-ed08-4189-a8aa-39c49415d8cb", "rider": "rider-707", "driver": "driver-707", "begin_lat": 0.8183759067610148, "begin_lon": 0.5936482561544364, "end_lat": 0.19655483974779764, "end_lon": 0.29594364305313803, "fare": 55.51725492240046, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1690135701138, "uuid": "ab32477b-401f-4659-9f3d-93a211ccc067", "rider": "rider-707", "driver": "driver-707", "begin_lat": 0.5488633090101539, "begin_lon": 0.9785314544611191, "end_lat": 0.13644545265446528, "end_lon": 0.7338703063422535, "fare": 8.046423637086441, "partitionpath": "asia/india/chennai"}, "partitionpath": "asia/india/chennai"}] | |
scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 1)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
scala> | |
scala> // printing the records to be deleted. | |
scala> df.select("partitionPath","uuid","ts","rider","fare").show(5, false) | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
|partitionPath |uuid |ts |rider |fare | | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
|americas/united_states/san_francisco|9d03d7d4-ed08-4189-a8aa-39c49415d8cb|1690586933363|rider-707|55.51725492240046| | |
|asia/india/chennai |ab32477b-401f-4659-9f3d-93a211ccc067|1690135701138|rider-707|8.046423637086441| | |
+------------------------------------+------------------------------------+-------------+---------+-----------------+ | |
scala> val toDeleteDf = df | |
toDeleteDf: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] | |
scala> toDeleteDf.createOrReplaceTempView("toDelete_tbl") | |
scala> | |
scala> // count total records before delete | |
scala> | |
scala> spark.sql("select count(*) from hudi_tbl2").show(false) | |
+--------+ | |
|count(1)| | |
+--------+ | |
|15 | | |
+--------+ | |
scala> | |
scala> df.withColumn("batch_id",lit("batch4")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.write.hive_style_partitioning","true"). | |
| option("hoodie.partition.metafile.use.base.format","true"). | |
| option("hoodie.datasource.write.drop.partition.columns","true"). | |
| option("hoodie.datasource.write.operation","delete"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
scala> | |
scala> | |
scala> spark. | |
| read. | |
| format("hudi"). | |
| load(basePath).createOrReplaceTempView("hudi_tbl3") | |
scala> | |
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl3 order by batch_id").show(100, false) | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|partitionpath |uuid |batch_id|rider |ts |fare | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 | | |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 | | |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 | | |
|americas/brazil/sao_paulo |c025d239-70b0-4a22-ac0b-a9502c83cd61|batch1 |rider-668|1690609944610|62.79408654844148 | | |
|americas/brazil/sao_paulo |104c4af1-74ec-40d5-af58-c59edf06925c|batch1 |rider-668|1690235686261|57.68639743536661 | | |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484| | |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945| | |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 | | |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 | | |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 | | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 | | |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
scala> | |
scala> // ensure total count reduced by 2 | |
scala> spark.sql("select count(*) from hudi_tbl3").show(false) | |
+--------+ | |
|count(1)| | |
+--------+ | |
|13 | | |
+--------+ | |
scala> | |
scala> // count records per batch/commit | |
scala> spark.sql("select batch_id, count(*) from hudi_tbl3 group by 1").show(100, false) | |
+--------+--------+ | |
|batch_id|count(1)| | |
+--------+--------+ | |
|batch1 |6 | | |
|batch2 |4 | | |
|batch3 |3 | | |
+--------+--------+ | |
scala> | |
scala> // we can verify that there is none with batchId = batch4 (since batch 4 records are deleted). | |
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl3 group by 1,2 ").show(100, false) | |
+-------------------+--------+--------+ | |
|_hoodie_commit_time|batch_id|count(1)| | |
+-------------------+--------+--------+ | |
|20230729205240123 |batch3 |3 | | |
|20230729205231238 |batch1 |6 | | |
|20230729205235432 |batch2 |4 | | |
+-------------------+--------+--------+ | |
scala> | |
scala> // verify that we don't find any records from to_delete df | |
scala> spark.sql("select count(*) from hudi_tbl3 where uuid in (select uuid from toDelete_tbl)").show() | |
+--------+ | |
|count(1)| | |
+--------+ | |
| 0| | |
+--------+ | |
scala> | |
scala> | |
scala> | |
scala> | |
scala> | |
scala> | |
scala> | |
scala> // soft deletes | |
scala> val softDeleteDs = spark.sql("select * from hudi_tbl3").limit(2) | |
softDeleteDs: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 14 more fields] | |
scala> | |
scala> // prepare the soft deletes by ensuring the appropriate fields are nullified | |
scala> val nullifyColumns = softDeleteDs.schema.fields. | |
| map(field => (field.name, field.dataType.typeName)). | |
| filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) | |
| && !Array("ts", "uuid", "partitionpath").contains(pair._1))) | |
nullifyColumns: Array[(String, String)] = Array((begin_lat,double), (begin_lon,double), (driver,string), (end_lat,double), (end_lon,double), (fare,double), (rider,string), (batch_id,string)) | |
scala> | |
scala> val softDeleteDf = nullifyColumns. | |
| foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( | |
| (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) | |
softDeleteDf: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 9 more fields] | |
scala> | |
scala> // verify that except partitionpath, uuid and ts every other columns is nullified. | |
scala> softDeleteDf.show(2, false) | |
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+ | |
|begin_lat|begin_lon|driver|end_lat|end_lon|fare|rider|ts |uuid |batch_id|partitionpath | | |
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+ | |
|null |null |null |null |null |null|null |1690235686261|104c4af1-74ec-40d5-af58-c59edf06925c|null |americas/brazil/sao_paulo| | |
|null |null |null |null |null |null|null |1690609944610|c025d239-70b0-4a22-ac0b-a9502c83cd61|null |americas/brazil/sao_paulo| | |
+---------+---------+------+-------+-------+----+-----+-------------+------------------------------------+--------+-------------------------+ | |
scala> | |
scala> softDeleteDf.createOrReplaceTempView("softDelete_tbl") | |
scala> | |
scala> // count the total records before soft deletes | |
scala> | |
scala> spark.sql("select count(*) from hudi_tbl3").show() | |
+--------+ | |
|count(1)| | |
+--------+ | |
| 13| | |
+--------+ | |
scala> | |
scala> // simply upsert the table after setting these fields to null | |
scala> softDeleteDf.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.write.hive_style_partitioning","true"). | |
| option("hoodie.partition.metafile.use.base.format","true"). | |
| option("hoodie.datasource.write.drop.partition.columns","true"). | |
| option("hoodie.datasource.write.operation","delete"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
scala> | |
scala> | |
scala> spark. | |
| read. | |
| format("hudi"). | |
| load(basePath).createOrReplaceTempView("hudi_tbl4") | |
scala> | |
scala> spark.sql("select partitionpath, uuid, batch_id, rider, ts, fare from hudi_tbl4 order by batch_id").show(100, false) | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|partitionpath |uuid |batch_id|rider |ts |fare | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
|americas/brazil/sao_paulo |a6219312-cb78-4bd7-8978-177ba783c67f|batch1 |rider-668|1690474005794|25.667476181356484| | |
|americas/united_states/san_francisco|d73df5f0-7feb-4d2b-bb91-1c3e6fd8380d|batch1 |rider-668|1690111294971|60.57368782999865 | | |
|americas/united_states/san_francisco|546d5cc4-14c3-4505-b3c8-9c49efe6b349|batch1 |rider-668|1690401346218|49.55662199969524 | | |
|americas/united_states/san_francisco|7e74bcf3-ccd1-43e3-a428-6132a3a835bf|batch1 |rider-668|1690508884178|21.39416477932461 | | |
|americas/brazil/sao_paulo |48170c31-f785-4685-a586-07a80aa3656e|batch2 |rider-048|1690305541919|69.09535493302582 | | |
|americas/brazil/sao_paulo |de84a55a-5761-4fc3-9088-3f930e88da03|batch2 |rider-048|1690350229349|89.89115113672493 | | |
|asia/india/chennai |ec73d529-319b-4558-8a70-1eac41e31414|batch2 |rider-048|1690294010867|51.299844734112945| | |
|asia/india/chennai |81647218-69c0-4c04-b804-17567b6a45b5|batch2 |rider-048|1690101668285|96.4500716154594 | | |
|americas/brazil/sao_paulo |9e7fe1f1-4d45-418f-afe0-1557bd74e8dc|batch3 |rider-837|1690236576603|6.860897540622235 | | |
|americas/united_states/san_francisco|1c6f9211-81c9-4fd5-b10c-c07b591dae7b|batch3 |rider-837|1690655608668|80.517971387309 | | |
|americas/united_states/san_francisco|4409ddd3-c79b-4587-865a-c534c15ea56b|batch3 |rider-837|1690178136769|74.7341018556108 | | |
+------------------------------------+------------------------------------+--------+---------+-------------+------------------+ | |
scala> | |
scala> // count records per batch/commit | |
scala> spark.sql("select batch_id, count(*) from hudi_tbl4 group by 1").show(100, false) | |
+--------+--------+ | |
|batch_id|count(1)| | |
+--------+--------+ | |
|batch1 |4 | | |
|batch2 |4 | | |
|batch3 |3 | | |
+--------+--------+ | |
scala> | |
scala> // we can verify that there is none with batchId = batch4 (since batch 4 records are deleted). | |
scala> spark.sql("select _hoodie_commit_time, batch_id, count(*) from hudi_tbl4 group by 1,2 ").show(100, false) | |
+-------------------+--------+--------+ | |
|_hoodie_commit_time|batch_id|count(1)| | |
+-------------------+--------+--------+ | |
|20230729205240123 |batch3 |3 | | |
|20230729205231238 |batch1 |4 | | |
|20230729205235432 |batch2 |4 | | |
+-------------------+--------+--------+ | |
scala> | |
scala> // verify that total count is reduced by 2 compared to the total record count we found before | |
scala> spark.sql("select count(*) from hudi_tbl4").show() | |
+--------+ | |
|count(1)| | |
+--------+ | |
| 11| | |
+--------+ | |
scala> | |
scala> | |
scala> // verify that we don't find any records from softDelete_tbl | |
scala> spark.sql("select count(*) from hudi_tbl4 where uuid in (select uuid from softDelete_tbl)").show() | |
+--------+ | |
|count(1)| | |
+--------+ | |
| 0| | |
+--------+ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment