Skip to content

Instantly share code, notes, and snippets.

@cb149
Created August 12, 2021 23:03
Show Gist options
  • Save cb149/cf60d3a8be052789d5fd6467b1792937 to your computer and use it in GitHub Desktop.
Save cb149/cf60d3a8be052789d5fd6467b1792937 to your computer and use it in GitHub Desktop.
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "part_0",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "part_0",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show()
val schemaAddedField = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true),
StructField("newStrField", StringType,true)
))
val data5 = Seq(Row("row_2", "part_1",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"),
Row("row_5", "part_1",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"),
Row("row_9", "part_1",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1"))
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField)
dfFromData5.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment