Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active November 7, 2021 12:42
Show Gist options
  • Save nsivabalan/950bdde0b3b64349bf0395b2e3e928d8 to your computer and use it in GitHub Desktop.
Save nsivabalan/950bdde0b3b64349bf0395b2e3e928d8 to your computer and use it in GitHub Desktop.
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--master local[2] \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "driver").
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.hive_style_partitioning","true").
mode(Overwrite).
save(basePath)
// partition path is set to "driver" field so that we can test delta log files easily.
exit
docker exec -it adhoc-2 /bin/bash
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://hiveserver:10000 \
--user hive \
--pass hive \
--partitioned-by _hoodie_partition_path \
--base-path /tmp/hudi_trips_cow \
--database default \
--table hudi_trips_cow \
--partition-value-extractor org.apache.hudi.hive.HiveStylePartitionValueExtractor
exit
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--master local[2] \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
// query existing dataset and find the driver value. all rows should have same value. but every new batch will
// generate a new value for driver which we don't want to. So, query the existing dataset and set the below
// value appropriately
val df1 = df.withColumn("driver",lit("driver-213"))
df1.write.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "driver").
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.hive_style_partitioning","true").
mode(Append).
save(basePath)
// feel free to repeat above updates 2 to 3 times. But since this is hdfs, log file size will grow, and so you may not
// see num log files increasing with every delta commit.
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false
select `_hoodie_commit_time`, count(*) as counts from hudi_trips_cow_ro group by `_hoodie_commit_time` ;
// below query fails for rt table while the same query above succeeds for ro table.
select `_hoodie_commit_time`, count(*) as counts from hudi_trips_cow_rt group by `_hoodie_commit_time` ;
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, count(*) as counts from hudi_trips_cow_rt group by `_hoodie_commit_time` ;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Error: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257)
at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91)
at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) (state=08S01,code=2)
// I did not even get to a point of doing incremental query since above query failed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment