Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active January 12, 2023 20:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/e5907d63b48b0d2a5b3cf7ac3f18811e to your computer and use it in GitHub Desktop.
Save nsivabalan/e5907d63b48b0d2a5b3cf7ac3f18811e to your computer and use it in GitHub Desktop.
./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --jars ~/Documents/personal/projects/nov26/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.13.0-SNAPSHOT.jar
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
spark.time(df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.meta.sync.enable","true").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath))
// output. Note that we don't get to see the time taken for the spark job. If spark.time(command) succeeded, we would have seen the time taken for the command.
spark.time(df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option("hoodie.datasource.hive_sync.enable","true").
| option("hoodie.datasource.meta.sync.enable","true").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath))
warning: there was one deprecation warning; re-run with -deprecation for details
23/01/12 12:53:59 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/01/12 12:54:01 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:850)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:848)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:848)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:945)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:367)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:149)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
at $anonfun$1.apply$mcV$sp(<console>:58)
at $anonfun$1.apply(<console>:58)
at $anonfun$1.apply(<console>:58)
at org.apache.spark.sql.SparkSession.time(SparkSession.scala:678)
... 70 elided
Caused by: org.apache.hudi.exception.HoodieException: Could not load meta sync class org.apache.hudi.hive.HiveSyncTool: no valid constructor found.
at org.apache.hudi.sync.common.util.SyncUtilHelpers.instantiateMetaSyncTool(SyncUtilHelpers.java:80)
at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:57)
... 102 more
// lets try our w/ hive sync to see the timer.
// hive sync is disabled.
spark.time(
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath))
//output
scala> spark.time(df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath))
warning: there was one deprecation warning; re-run with -deprecation for details
23/01/12 12:54:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/01/12 12:54:41 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
Time taken: 3124 ms
scala>
git diff
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index 26f90facdd..251ebcd6ff 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -20,7 +20,6 @@
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncTool;
@@ -31,7 +30,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collection;
-import java.util.Properties;
/**
* Helper class for syncing Hudi commit data with external metastores.
@@ -79,31 +77,33 @@ public class SyncUtilHelpers {
"Table name \"" + tableName + "\" contains capital letters. Your metastore may automatically convert this to lower case and can cause table not found errors during subsequent syncs.");
}
}
+ throw new HoodieException("Could not load meta sync class " + syncToolClassName
+ + ": no valid constructor found.");
- if (ReflectionUtils.hasConstructor(syncToolClassName,
- new Class<?>[] {Properties.class, Configuration.class})) {
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
- new Class<?>[] {Properties.class, Configuration.class},
- properties, hadoopConfig));
- } else if (ReflectionUtils.hasConstructor(syncToolClassName,
- new Class<?>[] {Properties.class})) {
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
- new Class<?>[] {Properties.class},
- properties));
- } else if (ReflectionUtils.hasConstructor(syncToolClassName,
- new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
- new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
- properties, hadoopConfig, fs));
- } else if (ReflectionUtils.hasConstructor(syncToolClassName,
- new Class<?>[] {Properties.class, FileSystem.class})) {
- return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
- new Class<?>[] {Properties.class, FileSystem.class},
- properties, fs));
- } else {
- throw new HoodieException("Could not load meta sync class " + syncToolClassName
- + ": no valid constructor found.");
- }
+// if (ReflectionUtils.hasConstructor(syncToolClassName,
+// new Class<?>[] {Properties.class, Configuration.class})) {
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
+// new Class<?>[] {Properties.class, Configuration.class},
+// properties, hadoopConfig));
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName,
+// new Class<?>[] {Properties.class})) {
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
+// new Class<?>[] {Properties.class},
+// properties));
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName,
+// new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
+// new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
+// properties, hadoopConfig, fs));
+// } else if (ReflectionUtils.hasConstructor(syncToolClassName,
+// new Class<?>[] {Properties.class, FileSystem.class})) {
+// return ((HoodieSyncTool) ReflectionUtils.loadClass(syncToolClassName,
+// new Class<?>[] {Properties.class, FileSystem.class},
+// properties, fs));
+// } else {
+// throw new HoodieException("Could not load meta sync class " + syncToolClassName
+// + ": no valid constructor found.");
+// }
}
public static HoodieException getExceptionFromList(Collection<HoodieException> exceptions) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment