Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active February 24, 2023 09:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/7ed9aabc65e8de536b07390e6e872b87 to your computer and use it in GitHub Desktop.
Save nsivabalan/7ed9aabc65e8de536b07390e6e872b87 to your computer and use it in GitHub Desktop.
git diff
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index e2de9c7633..ede050b1b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -33,8 +33,11 @@ import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
@@ -48,6 +51,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
@@ -102,6 +107,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
@@ -217,6 +223,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
return true;
}
LOG.info("Committing " + instantTime + " action " + commitActionType);
+
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
@@ -231,6 +238,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats);
+
+ if (config.isMetadataTableEnabled()) {
+ validateFileGroups(table, stats);
+ }
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
releaseResources();
@@ -267,8 +278,36 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
return true;
}
+ private void validateFileGroups(HoodieTable hoodieTable, List<HoodieWriteStat> writeStatList) {
+ LOG.info("Validating file groups for missed updated :: ");
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
+ HoodieTableFileSystemView fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(context, hoodieTable.getMetaClient(), metadataConfig);
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, metadataConfig, config.getBasePath());
+ partitionPaths.forEach(pPath -> fileSystemView.getLatestBaseFiles(pPath).collect(Collectors.toList()));
+ List<HoodieWriteStat> toProcessStats = writeStatList.stream().filter(writeStat -> !writeStat.getPath().endsWith(".parquet")).collect(Collectors.toList());
+
+ for (HoodieWriteStat writeStat : toProcessStats) {
+ String fileID = writeStat.getFileId();
+ String filePath = writeStat.getPath();
+ String prevCommitTime = writeStat.getPrevCommit();
+ String pPath = writeStat.getPartitionPath();
+ String logFilesBaseCommitTime = FSUtils.getCommitTime(filePath);
+ Option<FileSlice> fileSlice = fileSystemView.getLatestFileSlice(pPath, fileID);
+ if (!fileID.isEmpty()) {
+ String latestBaseFileInstantTime = fileSlice.get().getBaseInstantTime();
+ if (logFilesBaseCommitTime.equals(latestBaseFileInstantTime)) {
+ throw new HoodieIOException("XXX Log file added to last but one file slice for " + pPath + ", " + fileID + ", new log file " + filePath
+ + ", latest base file commit time : " + fileSlice.get().getBaseInstantTime());
+ } else {
+ LOG.debug("Log file added to latest file slice for " + pPath + ", " + fileID + ", new log file " + filePath
+ + ", latest base file commit time : " + fileSlice.get().getBaseInstantTime());
+ }
+ }
+ }
+ }
+
protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
- List<HoodieWriteStat> stats) throws IOException {
+ List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2ccd0435d3..d38c5940ee 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -68,6 +68,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.RandomFileIdPrefixProvider;
@@ -3042,7 +3043,9 @@ public class HoodieWriteConfig extends HoodieConfig {
if (writeConfig.isEmbeddedTimelineServerEnabled()) {
return MarkerType.TIMELINE_SERVER_BASED.toString();
} else {
- LOG.warn("Embedded timeline server is disabled, fallback to use direct marker type for spark");
+ if (!HoodieTableMetadata.isMetadataTable(this.writeConfig.getBasePath())) {
+ LOG.warn("Embedded timeline server is disabled, fallback to use direct marker type for spark");
+ }
return MarkerType.DIRECT.toString();
}
case FLINK:
diff --git a/hudi-tests-common/src/main/resources/log4j2-surefire.properties b/hudi-tests-common/src/main/resources/log4j2-surefire.properties
index 6b6b2fa5e5..b4da3f6cc7 100644
--- a/hudi-tests-common/src/main/resources/log4j2-surefire.properties
+++ b/hudi-tests-common/src/main/resources/log4j2-surefire.properties
@@ -32,6 +32,6 @@ rootLogger.appenderRef.stdout.ref = CONSOLE
logger.apache.name = org.apache
logger.apache.level = info
logger.hudi.name = org.apache.hudi
-logger.hudi.level = debug
+logger.hudi.level = warn
logger.hbase.name = org.apache.hadoop.hbase
logger.hbase.level = error
\ No newline at end of file
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 4d91e5076e..ebc3d2c2bc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -129,8 +129,6 @@ import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
-import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
-import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
@@ -139,6 +137,7 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SP
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.counter;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -712,7 +711,7 @@ public class DeltaSync implements Serializable, Closeable {
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap());
if (success) {
- LOG.info("Commit " + instantTime + " successful!");
+ LOG.warn("Commit " + instantTime + " successful! " + counter.incrementAndGet());
this.formatAdapter.getSource().onCommit(checkpointStr);
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
@@ -916,10 +915,10 @@ public class DeltaSync implements Serializable, Closeable {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props);
// Validate what deltastreamer assumes of write-config to be really safe
- ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(),
- String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled()));
- ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(),
- String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
+ //ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(),
+ // String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled()));
+ //ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(),
+ // String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(),
String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 834938a317..a383b75db3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -83,6 +83,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.String.format;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -117,6 +118,8 @@ public class HoodieDeltaStreamer implements Serializable {
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
+ public static AtomicInteger counter = new AtomicInteger(0);
+
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), Option.empty());
@@ -425,7 +428,7 @@ public class HoodieDeltaStreamer implements Serializable {
public boolean isInlineCompactionEnabled() {
// Inline compaction is disabled for continuous mode, otherwise enabled for MOR
- return !continuousMode && !forceDisableCompaction
+ return !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6ab80b5f..5b3cb4cc9d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -821,9 +821,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true, recordType);
}
- @Disabled("HUDI-5815 for investigation")
+ // @Disabled("HUDI-5815 for investigation")
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO"})
public void testUpsertsMORContinuousMode(HoodieRecordType recordType) throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", recordType);
}
@@ -863,6 +863,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
+ cfg.configs.add(String.format("%s=4", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment