Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created March 5, 2024 17:13
Show Gist options
  • Save ad1happy2go/faebfdbf4db70d411e4ab89c7df77775 to your computer and use it in GitHub Desktop.
Save ad1happy2go/faebfdbf4db70d411e4ab89c7df77775 to your computer and use it in GitHub Desktop.
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt date
| ) using hudi
| tblproperties (
| 'primaryKey' = 'id',
| 'preCombineField' = 'ts',
| 'hoodie.table.cdc.enabled' = 'true',
| 'type' = 'cow'
| )
| location '$basePath'
""".stripMargin)
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 11, 1000, cast('2021-07-01' as date)),
| (2, 'a2', 12, 1000, cast('2021-08-10' as date)),
| (3, 'a3', 13, 1000, cast('2023-01-20' as date))
""".stripMargin)
val cdcDataOnly1 = cdcDataFrame(basePath, 0)
val schemaFields = spark.read.format("hudi").load(basePath).schema.map(field => if(field.dataType == DateType) StructField(field.name, IntegerType) else field)
val schema = StructType(schemaFields)
cdcDataOnly1.show(false)
cdcDataOnly1.withColumn("jsonData", from_json(col("after"), schema)).select("jsonData.*").selectExpr("*", "from_unixtime(86400 * dt + unix_timestamp('1970-01-01', 'yyyy-MM-dd'), 'yyyy-MM-dd') as parsed_dt").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment