Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created May 30, 2024 06:56
Show Gist options
  • Save ad1happy2go/bad6a68f5aedd97aaa68ea82be265e7c to your computer and use it in GitHub Desktop.
Save ad1happy2go/bad6a68f5aedd97aaa68ea82be265e7c to your computer and use it in GitHub Desktop.
from pyspark.sql.functions import lit, col
tableName = "trips_table"
basePath = "file:///tmp/trips_table"
columns = ["ts","uuid","rider","driver","fare","city"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-C","driver-N",34.15,"sao_paulo")]
inserts = spark.createDataFrame(data).toDF(*columns)
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.partitionpath.field': 'city',
'hoodie.table.cdc.enabled': 'true',
'hoodie.datasource.write.table.type':'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field':'uuid'
}
# Insert data
inserts.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
updatesDf = spark.read.format("hudi").load(basePath).filter("rider == 'rider-A' or rider == 'rider-B'").withColumn("fare",col("fare")*10)
updatesDf.write.format("hudi"). \
mode("append"). \
save(basePath)
# Query CDC data
cdc_read_options = {
'hoodie.datasource.query.incremental.format': 'cdc',
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': 0
}
spark.read.format("hudi"). \
options(**cdc_read_options). \
load(basePath).show(10, False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment