Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active December 16, 2021 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/570a96004e41f84565c99d8994b12d57 to your computer and use it in GitHub Desktop.
Save nsivabalan/570a96004e41f84565c99d8994b12d57 to your computer and use it in GitHub Desktop.
run docker set up.
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/hive-site.xml spark/conf/
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/core-site.xml spark/conf/
root@adhoc-1:/opt# cp hadoop-2.8.4/etc/hadoop/hdfs-site.xml spark/conf/
$SPARK_INSTALL/bin/spark-shell --jars $HUDI_SPARK_BUNDLE --master local[2] --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalogImplementation=hive --deploy-mode client --driver-memory 1G --executor-memory 3G --num-executors 1 --packages org.apache.spark:spark-avro_2.11:2.4.4
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021/01/01",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.datasource.write.hive_style_partitioning","true").
option(TABLE_NAME, tableName).
option("hoodie.index.type","SIMPLE").
option(OPERATION_OPT_KEY, "insert").
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://hiveserver:10000/").
option("hoodie.datasource.hive_sync.database","testdb").
option("hoodie.datasource.hive_sync.table","testtable1").
option("hoodie.datasource.hive_sync.partition_fields","partitionId").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.write.drop.partition.columns","true").
mode(Overwrite).
save(basePath)
scala> val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
java.lang.IllegalArgumentException: Cannot find column: 'partitionId' in the schema[StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(rowId,StringType,true),StructField(preComb,LongType,true),StructField(name,StringType,true),StructField(versionId,StringType,true),StructField(toBeDeletedStr,StringType,true),StructField(intToLong,IntegerType,true),StructField(longToInt,LongType,true)]
at org.apache.hudi.HoodieFileIndex$$anonfun$5$$anonfun$apply$1.apply(HoodieFileIndex.scala:106)
at org.apache.hudi.HoodieFileIndex$$anonfun$5$$anonfun$apply$1.apply(HoodieFileIndex.scala:106)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.hudi.HoodieFileIndex$$anonfun$5.apply(HoodieFileIndex.scala:106)
at org.apache.hudi.HoodieFileIndex$$anonfun$5.apply(HoodieFileIndex.scala:105)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties$lzycompute(HoodieFileIndex.scala:105)
at org.apache.hudi.HoodieFileIndex._partitionSchemaFromProperties(HoodieFileIndex.scala:99)
at org.apache.hudi.HoodieFileIndex.getAllQueryPartitionPaths(HoodieFileIndex.scala:348)
at org.apache.hudi.HoodieFileIndex.loadPartitionPathFiles(HoodieFileIndex.scala:420)
at org.apache.hudi.HoodieFileIndex.refresh0(HoodieFileIndex.scala:214)
at org.apache.hudi.HoodieFileIndex.<init>(HoodieFileIndex.scala:149)
at org.apache.hudi.DefaultSource.getBaseFileOnlyView(DefaultSource.scala:199)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:116)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:67)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
... 63 elided
// with glob path specified, things work as expected.
val tripsSnapshotDF = spark.read.format("hudi").load(basePath+ "/*/*/*")
tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]
scala> tripsSnapshotDF.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- rowId: string (nullable = true)
|-- preComb: long (nullable = true)
|-- name: string (nullable = true)
|-- versionId: string (nullable = true)
|-- toBeDeletedStr: string (nullable = true)
|-- intToLong: integer (nullable = true)
|-- longToInt: long (nullable = true)
scala> tripsSnapshotDF.registerTempTable("tbl")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> spark.sql("desc tbl").show()
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time| string| null|
|_hoodie_commit_seqno| string| null|
| _hoodie_record_key| string| null|
|_hoodie_partition...| string| null|
| _hoodie_file_name| string| null|
| rowId| string| null|
| preComb| bigint| null|
| name| string| null|
| versionId| string| null|
| toBeDeletedStr| string| null|
| intToLong| int| null|
| longToInt| bigint| null|
+--------------------+---------+-------+
scala> spark.sql("select * from tbl limit 5").show()
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-------+----+---------+--------------+---------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|rowId|preComb|name|versionId|toBeDeletedStr|intToLong|longToInt|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-------+----+---------+--------------+---------+---------+
| 20211216163108| 20211216163108_0_1| row_1| partitionId=2021/...|e9f0f8cb-f21a-472...|row_1| 0| bob| v_0| toBeDel0| 0| 1000000|
| 20211216163108| 20211216163108_0_2| row_2| partitionId=2021/...|e9f0f8cb-f21a-472...|row_2| 0|john| v_0| toBeDel0| 0| 1000000|
| 20211216163108| 20211216163108_0_3| row_3| partitionId=2021/...|e9f0f8cb-f21a-472...|row_3| 0| tom| v_0| toBeDel0| 0| 1000000|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-------+----+---------+--------------+---------+---------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment