Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created June 15, 2021 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/33147072fabf5afa9cf2dfee1734e57a to your computer and use it in GitHub Desktop.
Save nsivabalan/33147072fabf5afa9cf2dfee1734e57a to your computer and use it in GitHub Desktop.
hudi schema evolution
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
simple schema:
rowId: string
partitionId: string
preComb: long
name: string
versionId: string
toBeDeletedStr: string
intToLong: int
longToInt : long
laterAddedField: string
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "part_0",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "part_0",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
mode(Overwrite).
save(basePath)
//
val data_inserts_1 = Seq(
Row("row_4", "part_0",1L,"morley","v_0","toBeDel0",0,1000000L),
Row("row_5", "part_0",1L,"maroon","v_0","toBeDel0",0,1000000L),
Row("row_6", "part_0",1L,"happy","v_0","toBeDel0",0,1000000L))
var dfFromData1 = spark.createDataFrame(data_inserts_1,schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
var data_updates_2 = Seq(
Row("row_1", "part_0",2L,"bob","v_1","toBeDel1",1,1000000L),
Row("row_2", "part_0",2L,"john","v_1","toBeDel1",1,1000000L),
Row("row_7", "part_0",2L,"hanks","v_0","toBeDel0",0,1000000L))
var dfFromData2 = spark.createDataFrame(data_updates_2,schema)
dfFromData2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show()
val data_inserts_10_1 = Seq(
Row("row_14", "part_0",1L,"morley1","v_1","toBeDel1",0,1000000L),
Row("row_15", "part_0",1L,"maroon1","v_1","toBeDel1",0,1000000L),
Row("row_16", "part_0",1L,"happy1","v_1","toBeDel1",0,1000000L))
var dfFromData10_1 = spark.createDataFrame(data_inserts_10_1,schema)
dfFromData10_1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val data_inserts_12_1 = Seq(
Row("row_34", "part_0",1L,"morley2","v_1","toBeDel1",0,1000000L),
Row("row_35", "part_0",1L,"maroon2","v_1","toBeDel1",0,1000000L),
Row("row_36", "part_0",1L,"happy2","v_1","toBeDel1",0,1000000L))
var dfFromData12_1 = spark.createDataFrame(data_inserts_12_1,schema)
dfFromData12_1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
var data_updates = Seq(Row("row_1", "part_0",0L,"bob","v_3","toBeDel0",0,1000000L),
Row("row_2", "part_0",0L,"john","v_3","toBeDel0",0,1000000L),
Row("row_3", "part_0",0L,"tom","v_3","toBeDel0",0,1000000L))
var dfFromData1 = spark.createDataFrame(data_updates,schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
// add a new field to schema
val schemaAddedField = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true),
StructField("evolvedField", StringType, true)
))
// insert w/ evolved field.
val data4 = Seq(Row("row_8", "part_0",0L,"jerry","v_0","toBeDel0",0,1000000L,"newField_0"),
Row("row_9", "part_0",0L,"michael","v_0","toBeDel0",0,1000000L, "newFiled_0"),
Row("row_10", "part_0",0L,"robert","v_0","toBeDel0",0,1000000L, "newFiled_0"))
var dfFromData4 = spark.createDataFrame(data4,schemaAddedField)
dfFromData4.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show()
// update w/ evolved schema
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"),
Row("row_3", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"),
Row("row_9", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1"))
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField)
dfFromData5.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show()
--------------------------------------------------------------------------------
// update w/ evolved schema
val data5 = Seq(Row("row_1", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"),
Row("row_2", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"),
Row("row_3", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1"))
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField)
dfFromData5.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
mode(Append).
save(basePath)
spark.sql.hive.convertMetastoreParquet false
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show()
Caused by: org.apache.avro.AvroTypeException: Found hoodie.hudi_trips_cow.hudi_trips_cow_record, expecting hoodie.hudi_trips_cow.hudi_trips_cow_record, missing required field evolvedField
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:215)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:165)
at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:289)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:324)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:252)
... 24 more
21/03/25 11:27:03 WARN TaskSetManager: Lost task 0.0 in stage 83.0 (TID 667, sivabala-c02xg219jgh6.attlocal.net, executor driver): org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:328)
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:210)
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77)
// MOR table
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "part_0",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "part_0",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
mode(Overwrite).
save(basePath)
//
val data_inserts_1 = Seq(
Row("row_4", "part_0",1L,"morley","v_0","toBeDel0",0,1000000L),
Row("row_5", "part_0",1L,"maroon","v_0","toBeDel0",0,1000000L),
Row("row_6", "part_0",1L,"happy","v_0","toBeDel0",0,1000000L))
var dfFromData1 = spark.createDataFrame(data_inserts_1,schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
val data_updates_2 = Seq(
Row("row_1", "part_0",2L,"bob","v_1","toBeDel1",1,1000000L),
Row("row_2", "part_0",2L,"john","v_1","toBeDel1",1,1000000L),
Row("row_7", "part_0",2L,"hanks","v_0","toBeDel0",0,1000000L))
var dfFromData2 = spark.createDataFrame(data_updates_2,schema)
dfFromData2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show()
// add a new field to schema
val schemaAddedField = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true),
StructField("evolvedField", StringType,true)
))
// insert w/ evolved field.
val data4 = Seq(Row("row_8", "part_0",0L,"jerry","v_0","toBeDel0",0,1000000L,"newField_0"),
Row("row_9", "part_0",0L,"michael","v_0","toBeDel0",0,1000000L, "newFiled_0"),
Row("row_10", "part_0",0L,"robert","v_0","toBeDel0",0,1000000L, "newFiled_0"))
var dfFromData4 = spark.createDataFrame(data4,schemaAddedField)
dfFromData4.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show()
// update w/ evolved schema
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"),
Row("row_5", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"),
Row("row_9", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1"))
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField)
dfFromData5.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show()
--------------------------------------------------------------------------------
// int to long. compatible schema evolution
val schemaIntToLong = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", LongType,true),
StructField("longToInt", LongType,true),
StructField("newStrField", StringType,true)
))
val data6 = Seq(Row("row_3", "part_0",6L,"tom","v_1","toBeDel6",3L,2000000L,"newField_1"),
Row("row_6", "part_0",6L,"happy","v_1","toBeDel6",3L,2000000L, "newFiled_1"),
Row("row_10", "part_0",6L,"robert","v_1","toBeDel6",2L,2000000L, "newFiled_1"))
var dfFromData6 = spark.createDataFrame(data6,schemaIntToLong)
dfFromData6.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show()
// long to int schema evolution. non compatible evolution.
// just inserts.
val schemaLongToInt = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", LongType,true),
StructField("longToInt", IntegerType,true),
StructField("newStrField", StringType,true)
))
val data7 = Seq(Row("row_11", "part_0",7L,"tammy","v_0","toBeDel7",0L,1000,"newField_0"),
Row("row_12", "part_0",7L,"sam","v_0","toBeDel7",0L,1000, "newFiled_0"),
Row("row_13", "part_0",7L,"Ivan","v_0","toBeDel7",0L,1000, "newFiled_0"))
var dfFromData7 = spark.createDataFrame(data7,schemaLongToInt)
dfFromData7.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show()
// omit a field. inserts.
val schemaOmitField = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToLong", LongType,true),
StructField("longToInt", LongType,true),
StructField("newStrField", StringType,true)
))
val data8 = Seq(Row("row_11", "part_0",7L,"tammy","v_0",0L,3000L,"newField_0"),
Row("row_12", "part_0",7L,"sam","v_0",0L,3000L, "newFiled_0"),
Row("row_13", "part_0",7L,"Ivan","v_0",0L,3000L, "newFiled_0"))
var dfFromData8 = spark.createDataFrame(data8,schemaOmitField)
dfFromData8.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show()
https://youtu.be/-NcZmVRGB2Y
https://youtu.be/-NcZmVRGB2Y
user1:
1000 acks
1000 pushes
user2:
20 acks
20 pushes
10 sessions
1/10
2/10
4/10
~0.5 push success rate
percentile
user_id, device_id, session_id ?
push_succ_rate,
ack_rate
expiry_rate
dedup_rate
3 preCombine requests/issues
CSS Kafka: can we take it up and get it in.
- custom deser to set right schema.
- adding kafka meta columns
Put up a page in hudi for diff versions supported in EMR?
perf metadata table?
0.8 release?
Blog: on small file handling.
fivetran, starburst ->
onClose()
if(OK)
code_block1
callSuccess = true;
else if(NOT_FOUND and header present)
handle redirect
else
code_block1
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE
callSuccess = true
else
callSuccess = false
if(!NOT_FOUND) // NOT_FOUND is used for redirects. so any other status code is happy path.
{
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE
callSuccess = false
else
callSuccess = true
failOverInterceptor.setCallSuccess(callSuccess);
closeCall(status, trailers);
} else { // if status == NOT_FOUND
if (header present) {
if(valid header to redriect) {
// handle redirect
} else {
// throw an exception ...
}
} else {
failOverInterceptor.setCallSuccess(true);
closeCall(status, trailers);
}
}
failover event handler:
response feedback.
response event: succeeded/failed.
failed: n/w error. request couldn't reach the server.
any other error is considered a success.
// check if its redirect
if(NOT_FOUND and header present)
do redirect
else
// close this call.
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE
callSuccess = true
else
callSuccess = false
t0: msg_0 : added to queue.
t1: msg_1 (of same type) -> dedup msg_0 (emit a metric msg_0 is deduped)
msg_1 will be added to queue
t2: msg_2(of same type) -> dedup msg_1 (emit a metric msg_1 is deduped)
msg_2 will be added to queue
(total - expired - pushed_expired)/ total_msgs
total_msgs = pushed_only + pushed_acked + pushed_expired + expired_only
1 - ()
CASE msg.action.action_name
WHEN 'streamgate.push' THEN 1
WHEN 'streamgate.bgpoll' THEN 1
ELSE 0
END as push_count_raw,
kapil: to verify if expiry is proactive or reactive.
if reactive, push_only could be more.
TC w/ 54 stock price: 600k.
2020: 187 + 37 + 254 = 478.
4500 shares.
$45 = 426k
$60 = 494k
2021: 206 + 39.8+(291/4)+280 = 598
4950 shares + 700 = 4950 + 700 = 5650
$45 = 500k
$55 = 556k
$60 = 585k
No of shares : 93000/4 + 14500 = 37750/54 = ~ 700
Linda:
5B range:
553k. equity: $54: 5700 units.
2x:
1.4x: exceeds. some are 1.6x.
1.2x: mid high. : considered very high.
sanjeev:
top top: 6% (5 people)
exceptional: 11. 13%
mid high 22 people. 25%. myself.
- % of sessions which get blocked on OAuth calls to backend
- for blocked sessions, latency spike for endpoints (bootstrap, app-launch, getmarketplace). In other words, compare latencies for these endpoints across sessions w/ blocked call vs not.
- Distribution of retry counts for Oauth backend fetches.
- OAuth backend fetch token fails.
- frequency for logged out sessions.
Core metrics:
1. push overall rate/delivery rate
// is overall system better or worse
numerator:
pushed_acked
denominator:
pending_only + pushed_only + pushed_expired + pushed_deduped + expired_only + deduped_only
distribution across sessions if feasible or deviceIds.
2. Distribution of msgs across diff states.
3. push_succ_rate, expiry rate, dedup_rate.
spark-shell \
--packages org.apache.spark:spark-avro_2.12:3.0.1 \
--jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark3-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
why mouse over shows 2.11 even though I generate for scala 2.12?
Cannot resolve org.apache.hudi:hudi-spark_2.12:0.8.0-SNAPSHOT
hudi-spark includes both hudi-spark2 and hudi-spark3? why. Is it that hudi-spark is what gets packaged either w/ hudi-spark2 or hudi-spark3. or is it hudi-spark2 and hudi-spark3 that gets packaged in general?
Can you go over bundling logic w/ pom ?
After packaging, how do I know which version artifacts are included?
scala and spark version artifacts are picked up?
when we deploye the artifacts to staging, we generate twice and push it twice right. so, how are common artifacts are handled here? scala 11 and scala12.
If its all one and the same, but we push it again,
should we do 3 times then.
scala 11 and spark2
scala 12 and spark2
scala 12 and spark3
then, no need for new bundle itself right?
try{
client.greet();
assert.fail() //
} catch(Exception e) {
assert
}
session1
999(acks)
total msgs: 1000
deliery rate: acks/ all msgs.
100%
session2
acks: 10
total msgs: 20
delivery rate: 50%
session3
acks: 5
total msgs: 20
deliery rate: 25%
999 + 10 + 5
1000 + 20 + 20
mvn clean package -DskipTests
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar
hudi-spark_2.11-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar
hudi-spark2_2.11-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar
hudi-spark2.4.4-bundle_2.11-0.8.0-SNAPSHOT.jar
original-hudi-spark2.4.4-bundle_2.11-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar
hudi-utilities_2.11-0.8.0-SNAPSHOT.jar
mvn clean package -DskipTests -Dscala-2.12
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar
hudi-spark_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar
hudi-spark2_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar
hudi-spark2-bundle_2.12-0.8.0-SNAPSHOT.jar
original-hudi-spark2-bundle_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar
hudi-utilities_2.12-0.8.0-SNAPSHOT.jar
mvn clean package -DskipTests -Dspark3 -Dscala-2.12
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar
hudi-spark_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar
hudi-spark2_2.12-0.8.0-SNAPSHOT.jar
^[[Asivabala-C02XG219JGH6:hudi sivabals hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar.jar
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar
hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar
original-hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar
hudi-utilities_2.12-0.8.0-SNAPSHOT.jar
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:2.4.4 \
--jars /Users/sivabala/Documents/personal/projects/siva_hudi/gcs/jars/gcs-connector-hadoop2-2.1.4.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf spark.hadoop.google.cloud.auth.service.account.enable= true \
--conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=/Users/sivabala/Documents/personal/projects/siva_hudi/gcs/keys/march2021/sunlit-tea-306711-67fd625f7e5b.json
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "gs://tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val conf = sc.hadoopConfiguration
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
(3) The property was transferred on ___, 2021
(9) This statment is executed on ______
manage async operations
stats / monitoring.
freshness
file sizes
Basically you need to write unit tests with the below mindset.
When you walk through a source code, for every line, especially around if else, exception handling cases, we need to think do we have a unit test covering this. For eg, if we have a if else block, both branches should have tests. All inputs should be generated and sent from test. All components that the test class of interest interacts could be mocked out depending on convenience. But every output and actions(which are visible to outside of class) taken within the test class should be tested.
Let me take a stab at FailoverInterceptor in general.
This interceptor's purpose is to inject the right hostname and redirect if required.
- So, we need to have tests to ensure the right hostname is picked.
- Update hostname from eventhandler and ensure new requests pick the updated value.
- But also, interceptor posts event to the eventHandler. so, every event needs to be verified for its value. This is critical, bcoz, entire state machine is dependent on this response and redirect event. If responseEvent for some reason posts events w/ all success, failover state machine will never updates its hosts itself. So, we need to verify that for every response received w/ diff status code, right response event is posted.
- Interceptor also checks for network connectivity and sends back response immediately if there is no network. So, apart from verifying that caller gets the right response back, you should also assert that no event has been posted to eventHandler.
- Then comes the redirect. There are lot of different scenarios we can test here. We have already gone through this to a reasonable extent. but here too, every response event and redirect event should be verified for values. for instance, redirect event should have the right values set. if redirected multiple times, everytime the redirect event has to be verified. As these are very important for the correctness of the failover state machine.
cluster: gcp-test-cluster
bucket: dataproc-staging-us-east4-673219471497-er2y0khx
gcloud config set project sunlit-tea-306711
gcloud config set dataproc/region us-east4
gcloud dataproc clusters list
/home/n_siva_b/hudi-spark-bundle_2.11-0.8.0-SNAPSHOT.jar
// not sure if this is really required. next time, try out just google chrome command only. next section
gcloud config set project sunlit-tea-306711
gcloud config set dataproc/region us-east4
gcloud dataproc clusters list
from local terminal:
gcloud compute ssh gcp-test-cluster-m --project=sunlit-tea-306711 --zone=us-east4-b -- -D 1080 -N
gcloud compute ssh gcp-sample-m --project=sample-project-gcp-306919 --zone=us-east1-b -- -D 1080 -N
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \
--proxy-server="socks5://localhost:1080" \
--user-data-dir=/tmp/gcp-test-cluster-m
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \
--proxy-server="socks5://localhost:1080" \
--user-data-dir=/tmp/gcp-sample-m
after this, go to // actually it will automatically launch.
http://gcp-test-cluster-m:8088/cluster in your new instance of browser.
for yarn UI
click on latest spark job UI after launching spark shell.
https://console.cloud.google.com/logs/query;query=resource.type%3D%22cloud_dataproc_cluster%22%0Aresource.labels.cluster_name%3D%22gcp-test-cluster%22%0Aresource.labels.cluster_uuid%3D%22d5e3472e-afe0-4116-9839-babbe5bf7392%22?project=sunlit-tea-306711
https://console.cloud.google.com/dataproc/clusters?project=sunlit-tea-306711
// copy hudi spark bundle
spark-shell \
--packages org.apache.spark:spark-avro_2.12:3.0.0 \
--jars /home/n_siva_b/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "gs://dataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
var df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
var updates = convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
updates = convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
master branch
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
LOG.warn("HoodieLogFileReader :: canonical name :: " + fsDataInputStream.getClass().getCanonicalName() + ", name "
+ fsDataInputStream.getClass().getName());
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
LOG.warn("HoodieLogFileReader :: 111 completed ");
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader :: 222 start " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader :: 222 complete");
} else {
LOG.warn("HoodieLogFileReader :: 333 ");
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
this.inputStream = fsDataInputStream;
}
"HoodieLogFileReader :: canonical name :: org.apache.hadoop.fs.FSDataInputStream, name org.apache.hadoop.fs.FSDataInputStream"
"HoodieLogFileReader :: 111 start GCSFileSystem com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"
Caused by: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop
.fs.FSDataInputStream
at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84)
at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131)
... 24 more
2nd variant. this PR in its current state.
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
try {
FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream(localFSInputStream,bufferSize))), true);
LOG.warn("HoodieLogFileReader 2222 aaa succeeded " + logFile.getFileName());
} catch (ClassCastException e) {
Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause()
+ ", msg " + e.getMessage());
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original "
+ "fsDataInputStream");
inputStreamLocal = fsDataInputStream;
}
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
"HoodieLogFileReader 1111 .0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"
3rd variant
if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()
+ ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream)
+ " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
try {
FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream(localFSInputStream,bufferSize))), true);
LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName());
} catch (ClassCastException e) {
LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause()
+ ", msg " + e.getMessage());
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original "
+ "fsDataInputStream");
inputStreamLocal = fsDataInputStream;
}
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader 222 completed ");
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
"HoodieLogFileReader 111 aaa .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true"
"HoodieLogFileReader 111 bbb (aaa failed) .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop.fs.FSDataInputStream"
"Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original fsDataInputStream"
-b.c.sunlit-tea-306711.internal executor 2): org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330)
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213)
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2
0210306140026.log.1_0-55-76', fileLen=0}
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375)
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140)
... 24 more
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us-
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1
_0-55-76'
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(Google
CloudStorageReadChannel.java:653)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudSto
rageReadChannel.java:535)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65)
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222)
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251)
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171)
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373)
... 26 more
21/03/06 14:01:02 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 64.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 4 times, most recent failure: Lost tas
k 0.3 in stage 64.0 (TID 88) (gcp-test-cluster-w-1.us-east4-b.c.sunlit-tea-306711.internal executor 1): org.apache.hudi.exception.Hoo
dieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330)
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213)
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2
0210306140026.log.1_0-55-76', fileLen=0}
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375)
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140)
... 24 more
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us-
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1
_0-55-76'
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(Google
CloudStorageReadChannel.java:653)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudSto
rageReadChannel.java:535)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65)
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222)
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251)
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171)
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373)
... 26 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
at org.apache.spark.sql.Dataset.show(Dataset.scala:793)
... 59 elided
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330)
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213)
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2
0210306140026.log.1_0-55-76', fileLen=0}
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375)
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140)
... 24 more
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us-
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1
_0-55-76'
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(GoogleCloudS
torageReadChannel.java:653)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudStorageRe
adChannel.java:535)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65)
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222)
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251)
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171)
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373)
4th variant
if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()
+ ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream)
+ " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
try {
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(),bufferSize))), true);
LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName());
} catch (ClassCastException e) {
LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause()
+ ", msg " + e.getMessage());
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original "
+ "fsDataInputStream");
inputStreamLocal = fsDataInputStream;
}
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader 222 completed ");
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
"HoodieLogFileReader 111 aaa .1efaf945-bfb2-40ac-bb93-2dadfdbcb728-0_20210306140833.log.1_1-55-75 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true"
"HoodieLogFileReader 111 aaa succeeded .1efaf945-bfb2-40ac-bb93-2dadfdbcb728-0_20210306140833.log.1_1-55-75"
5th variant
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader 111 completed ");
if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream");
inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true);
}
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
"HoodieLogFileReader 111 start .7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76"
"HoodieLogFileReader 111 completed "
"HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"
SparkMergeHelper: new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
public GenericDatumReader(Schema writer, Schema reader) {
writer: old schema w/ which the reord was written.
reader: new schema or updated schema.
go thru sparkMergeHelper.
but why below code in SparkMergeHelper?
if externalSchemaTransformation {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
}
else block makes sense.
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetafields();
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
readerIterator = reader.getRecordIterator(readSchema);
setEnabled in okhttp3. mimic in grpc.
mimic modules.
rebase and add interceptor factory
test w/ local device.
mdinga@sbcglobal.net
1. will start reviewing schema related PRs in a day or two.
5. kafka timestamp PR.
6. need to consolidate partial update PRs. and just have one.
have asked Vlad if he can work on the feedback. if not, I am gonna take it up by tmrw or day after.
- had to spend time in attending issues as well.
- one issue. hudi 0.6.0 spark2 to spark3. 6 mins to 3.4 hrs. might be spending some time on this once he responds.
2. spark3 bundle clarification.
3. logo: timeline ?
4. Confirm H4 and H4 EAD filing as well. what happens if I switch companies after my H1B approval, but my wife's H4 (and/or) H4 EAD is still pending.
1. NetworkClassifier.
NetworkClassificationStream.LatencyBand -> very widely in driver.
2.
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar
./spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.8.0-SNAPSHOT.jar
./spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.6 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar
101251639
​<profiles>
​<profile>
​<id>dev</id>
​<activation>
​<activeByDefault>true</activeByDefault>
​</activation>
​<properties>
​<!-- potential properties here-->
​</properties>
​</profile>
​<profile>
​<id>notests</id>
​<properties>
​<skipTests>true</skipTests>
​</properties>
​</profile>
​</profiles>
[INFO] Running org.apache.hudi.table.upgrade.TestUpgradeDowngrade
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
Formatting using clusterid: testClusterID
[ERROR] Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 90.248 s <<< FAILURE! - in org.apache.hudi.table.upgrade.TestUpgradeDowngrade
[ERROR] org.apache.hudi.table.upgrade.TestUpgradeDowngrade.testDowngrade(boolean,HoodieTableType) Time elapsed: 0.699 s <<< ERROR!
java.lang.NullPointerException
at org.apache.hudi.table.upgrade.TestUpgradeDowngrade.setUp(TestUpgradeDowngrade.java:89)
=====[ 2826 seconds still running ]=====
[INFO] Tests run: 23, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 353.046 s - in org.apache.hudi.table.TestHoodieMergeOnReadTable
[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] org.apache.hudi.table.upgrade.TestUpgradeDowngrade.testDowngrade(boolean,HoodieTableType)
[ERROR] Run 1: TestUpgradeDowngrade.setUp:89->HoodieClientTestHarness.initDFS:255 » NullPointer
[INFO] Run 2: PASS
[INFO] Run 3: PASS
[INFO] Run 4: PASS
[INFO]
[INFO]
[ERROR] Tests run: 360, Failures: 0, Errors: 1, Skipped: 1
[INFO]
----------------------------------
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("doubleToInt", DoubleType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0",0.0),
Row("row_2", "part_0",0L,"john","v_0",0.0),
Row("row_3", "part_0",0L,"tom","v_0",0.0))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
mode(Overwrite).
save(basePath)
// add a new field to schema
val schemaEvolved = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("doubleToInt", IntegerType,true)
))
// insert w/ evolved field.
// update w/ evolved schema
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3",1),
Row("row_3", "part_0",5L,"maroon","v_2",1),
Row("row_9", "part_0",5L,"michael","v_2",2))
var dfFromData5 = spark.createDataFrame(data5,schemaEvolved)
dfFromData5.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, doubleToInt from hudi_trips_snapshot").show()
double to integer scheme evolution:
https://gist.github.com/nsivabalan/6bc1b17e6d87af56d114b654110af867
integer to double schema evolution:
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToDouble", IntegerType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0",0),
Row("row_2", "part_0",0L,"john","v_0",0),
Row("row_3", "part_0",0L,"tom","v_0",0))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
mode(Overwrite).
save(basePath)
val schemaEvolved = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToDouble", DoubleType,true)
))
// insert w/ evolved field.
// update w/ evolved schema
val data1 = Seq(Row("row_2", "part_0",5L,"john","v_3",1.0),
Row("row_3", "part_0",5L,"maroon","v_2",1.0),
Row("row_9", "part_0",5L,"michael","v_2",1.0))
var dfFromData1 = spark.createDataFrame(data1,schemaEvolved)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, intToDouble from hudi_trips_snapshot").show()
-------------------
MOR schema evolution. add one column
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob"),
Row("row_2", "part_0",0L,"john"),
Row("row_3", "part_0",0L,"tom"))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Overwrite).
save(basePath)
var data1 = Seq(Row("row_2", "part_0",0L,"bob_1"),
Row("row_3", "part_0",0L,"john_1"),
Row("row_5", "part_0",0L,"braddy_0"))
var dfFromData1 = spark.createDataFrame(data1,schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
val schemaEvol = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("newField", StringType,true)
))
val data2 = Seq(Row("row_1", "part_0",0L),
Row("row_2", "part_0",0L),
Row("row_6", "part_0",0L))
val data2 = Seq(Row("row_1", "part_0",0L,"bob","new_val1"),
Row("row_2", "part_0",0L,"john","new_val1"),
Row("row_6", "part_0",0L,"james","new_val1"))
var dfFromData2 = spark.createDataFrame(data2,schemaEvol)
dfFromData2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, newField from hudi_trips_snapshot").show()
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
val userSchema = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"rowKey\",\"type\":\"string\"},{\"name\":\"strField\",\"type\":\"string\"}]}"
dataGen.instantiateSchema(userSchema, "rowKey")
val deletes = convertDeletesToStringList(dataGen.generateDeletes(5))
val deletesDf = spark.read.json(spark.sparkContext.parallelize(deletes, 1))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "strField").
option(RECORDKEY_FIELD_OPT_KEY, "rowKey").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "strField").
option(RECORDKEY_FIELD_OPT_KEY, "rowKey").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val userSchema = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"rowKey\",\"type\":\"string\"},{\"name\":\"strField\",\"type\":\"string\"},{\"name\":\"intField\",\"type\":\"int\"},{\"name\":\"longField\",\"type\":\"long\"},{\"name\":\"booleanField\",\"type\":\"boolean\"}]}"
// ensure schema has the field for record key. partition path is added internally by the datagen tool and so not required to be part of the userSchema. As of now, only SimpleKeyGenerator is supported for both record key and partition path and partitionpath field is hardcoded.
dataGen.instantiateSchema(userSchema, "ts_ms")
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts_ms").
option(RECORDKEY_FIELD_OPT_KEY, "ts_ms").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false)
// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "_row_key").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false)
val deletes = convertDeletesToStringList(dataGen.generateDeletes(5))
val deletesDf = spark.read.json(spark.sparkContext.parallelize(deletes, 1))
deletesDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "_row_key").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false)
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
"{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"begin_lat\",\"type\":\"double\"},{\"name\":\"begin_lon\",\"type\":\"double\"},{\"name\":\"end_lat\",\"type\":\"double\"},{\"name\":\"end_lon\",\"type\":\"double\"},{\"name\":\"distance_in_meters\",\"type\":\"int\"},{\"name\":\"seconds_since_epoch\",\"type\":\"long\"},{\"name\":\"weight\",\"type\":\"float\"},{\"name\":\"nation\",\"type\":\"bytes\"},{\"name\":\"current_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"current_ts\",\"type\":\"long\"},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\":\"city_to_state\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"fare\",\"type\":{\"type\":\"record\",\"name\":\"fare\",\"fields\":[{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"}]}},{\"name\":\"tip_history\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"tip_history\",\"fields\":[{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"}],\"default\":null},\"default\":[]},\"default\":[]},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\",\"default\":false}]}"
Df schema StructType(StructField(_hoodie_is_deleted,BooleanType,true), StructField(_row_key,StringType,true), StructField(begin_lat,DoubleType,true), StructField(begin_lon,DoubleType,true), StructField(city_to_state,StructType(StructField(
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment