Last active
January 12, 2023 20:58
-
-
Save nsivabalan/e5907d63b48b0d2a5b3cf7ac3f18811e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --jars ~/Documents/personal/projects/nov26/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.13.0-SNAPSHOT.jar | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.common.model.HoodieRecord | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
// spark-shell | |
val tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath) | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
spark.time(df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option("hoodie.datasource.hive_sync.enable","true"). | |
option("hoodie.datasource.meta.sync.enable","true"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath)) | |
// output. Note that we don't get to see the time taken for the spark job. If spark.time(command) succeeded, we would have seen the time taken for the command. | |
spark.time(df.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option("hoodie.datasource.hive_sync.enable","true"). | |
| option("hoodie.datasource.meta.sync.enable","true"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
23/01/12 12:53:59 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark | |
23/01/12 12:54:01 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark | |
org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool | |
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59) | |
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:850) | |
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:848) | |
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) | |
at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:848) | |
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:945) | |
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:367) | |
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:149) | |
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) | |
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) | |
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) | |
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) | |
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) | |
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) | |
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) | |
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) | |
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) | |
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) | |
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) | |
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) | |
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696) | |
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696) | |
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) | |
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) | |
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) | |
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696) | |
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305) | |
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291) | |
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249) | |
at $anonfun$1.apply$mcV$sp(<console>:58) | |
at $anonfun$1.apply(<console>:58) | |
at $anonfun$1.apply(<console>:58) | |
at org.apache.spark.sql.SparkSession.time(SparkSession.scala:678) | |
... 70 elided | |
Caused by: org.apache.hudi.exception.HoodieException: Could not load meta sync class org.apache.hudi.hive.HiveSyncTool: no valid constructor found. | |
at org.apache.hudi.sync.common.util.SyncUtilHelpers.instantiateMetaSyncTool(SyncUtilHelpers.java:80) | |
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:57) | |
... 102 more | |
// lets try our w/ hive sync to see the timer. | |
// hive sync is disabled. | |
spark.time( | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath)) | |
//output | |
scala> spark.time(df.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| mode(Append). | |
| save(basePath)) | |
warning: there was one deprecation warning; re-run with -deprecation for details | |
23/01/12 12:54:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark | |
23/01/12 12:54:41 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark | |
Time taken: 3124 ms | |
scala> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git diff | |
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java | |
index 26f90facdd..251ebcd6ff 100644 | |
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java | |
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java | |
@@ -20,7 +20,6 @@ | |
package org.apache.hudi.sync.common.util; | |
import org.apache.hudi.common.config.TypedProperties; | |
-import org.apache.hudi.common.util.ReflectionUtils; | |
import org.apache.hudi.exception.HoodieException; | |
import org.apache.hudi.sync.common.HoodieSyncConfig; | |
import org.apache.hudi.sync.common.HoodieSyncTool; | |
@@ -31,7 +30,6 @@ import org.apache.log4j.LogManager; | |
import org.apache.log4j.Logger; | |
import java.util.Collection; | |
-import java.util.Properties; | |
/** | |
* Helper class for syncing Hudi commit data with external metastores. | |
@@ -79,31 +77,33 @@ public class SyncUtilHelpers { | |
"Table name \"" + tableName + "\" contains capital letters. Your metastore may automatically convert this to lower case and can cause table not found errors during subsequent syncs."); | |
} | |
} | |
+ throw new HoodieException("Could not load meta sync class " + syncToolClassName | |
+ + ": no valid constructor found."); | |
- if (ReflectionUtils.hasConstructor(syncToolClassName, | |
- new Class<?>[] {Properties.class, Configuration.class})) { | |
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
- new Class<?>[] {Properties.class, Configuration.class}, | |
- properties, hadoopConfig)); | |
- } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
- new Class<?>[] {Properties.class})) { | |
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
- new Class<?>[] {Properties.class}, | |
- properties)); | |
- } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
- new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) { | |
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
- new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class}, | |
- properties, hadoopConfig, fs)); | |
- } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
- new Class<?>[] {Properties.class, FileSystem.class})) { | |
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
- new Class<?>[] {Properties.class, FileSystem.class}, | |
- properties, fs)); | |
- } else { | |
- throw new HoodieException("Could not load meta sync class " + syncToolClassName | |
- + ": no valid constructor found."); | |
- } | |
+// if (ReflectionUtils.hasConstructor(syncToolClassName, | |
+// new Class<?>[] {Properties.class, Configuration.class})) { | |
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
+// new Class<?>[] {Properties.class, Configuration.class}, | |
+// properties, hadoopConfig)); | |
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
+// new Class<?>[] {Properties.class})) { | |
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
+// new Class<?>[] {Properties.class}, | |
+// properties)); | |
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
+// new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) { | |
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
+// new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class}, | |
+// properties, hadoopConfig, fs)); | |
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName, | |
+// new Class<?>[] {Properties.class, FileSystem.class})) { | |
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName, | |
+// new Class<?>[] {Properties.class, FileSystem.class}, | |
+// properties, fs)); | |
+// } else { | |
+// throw new HoodieException("Could not load meta sync class " + syncToolClassName | |
+// + ": no valid constructor found."); | |
+// } | |
} | |
public static HoodieException getExceptionFromList(Collection<HoodieException> exceptions) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment