Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created July 10, 2023 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/503bbc73f0060ffd1a11ef81dd9b7f3c to your computer and use it in GitHub Desktop.
Save nsivabalan/503bbc73f0060ffd1a11ef81dd9b7f3c to your computer and use it in GitHub Desktop.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index c4323081a30..8287faf3ae4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -51,6 +53,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
@@ -91,6 +94,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -99,6 +103,7 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADAT
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
@@ -886,7 +891,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* Update from {@code HoodieCleanMetadata}.
*
* @param cleanMetadata {@code HoodieCleanMetadata}
- * @param instantTime Timestamp at which the clean was completed
+ * @param instantTime Timestamp at which the clean was completed
*/
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
@@ -899,7 +904,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* Update from {@code HoodieRestoreMetadata}.
*
* @param restoreMetadata {@code HoodieRestoreMetadata}
- * @param instantTime Timestamp at which the restore was performed
+ * @param instantTime Timestamp at which the restore was performed
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
@@ -908,15 +913,39 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the
// restore was performed. If this is not present, then the restore was performed to the earliest commit. This
// could happen in case of bootstrap followed by rollback e.g. TestBootstrap#testFullBootstrapWithRegexModeWithUpdatesMOR.
- Option<HoodieInstant> restoreInstant = dataMetaClient.getActiveTimeline().getWriteTimeline().lastInstant();
- final String restoreToInstantTime;
+ /*Option<HoodieInstant> restoreInstant = dataMetaClient.getActiveTimeline().getRestoreTimeline().lastInstant();
+ final String restoreToInstantTime = restoreMetadata.getStartRestoreTime();
if (restoreInstant.isPresent()) {
restoreToInstantTime = restoreInstant.get().getTimestamp();
} else {
- restoreToInstantTime = metadataMetaClient.getActiveTimeline().filterCompletedInstants().firstInstant().get().getTimestamp();
+ restoreToInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().firstInstant().get().getTimestamp();
+ }*/
+
+ HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
+ HoodieInstant requested = HoodieTimeline.getRestoreRequestedInstant(restoreInstant);
+ HoodieRestorePlan restorePlan = null;
+ try {
+ restorePlan = TimelineMetadataUtils.deserializeAvroMetadata(
+ dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
+ } catch (IOException e) {
+ throw new HoodieIOException("Deserialization of rollback plan failed ", e);
+ }
+ final String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp();
+
+ Option<HoodieInstant> latestCleanInstant = metadataMetaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
+ if (latestCleanInstant.isPresent()) {
+ // fetch the earliest commit to retain and ensure the base file prior to the time to restore is present
+ List<HoodieFileGroup> filesGroups = metadata.getMetadataFileSystemView().getAllFileGroups("files").collect(Collectors.toList());
+ boolean canRestore = filesGroups.get(0).getAllFileSlices().map(fileSlice -> fileSlice.getBaseInstantTime()).anyMatch(
+ instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1, LESSER_THAN_OR_EQUALS, restoreToInstantTime));
+ if (!canRestore) {
+ throw new HoodieMetadataException("Can't restore since there is no base file in MDT lesser than the commit to restore to. " +
+ "Please delete metadata table and retry");
+ }
+ } else {
+ // we are good to proceed.
}
- // We cannot restore to before the oldest compaction on MDT as we don't have the base files before that time.
Option<HoodieInstant> oldestCompaction = metadataMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant();
if (oldestCompaction.isPresent()) {
if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime, oldestCompaction.get().getTimestamp())) {
@@ -982,7 +1011,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* Update from {@code HoodieRollbackMetadata}.
*
* @param rollbackMetadata {@code HoodieRollbackMetadata}
- * @param instantTime Timestamp at which the rollback was performed
+ * @param instantTime Timestamp at which the rollback was performed
*/
@Override
public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) {
@@ -1018,7 +1047,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
String rollbackInstantTime = createRollbackTimestamp(instantTime);
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(),
- dataMetaClient, rollbackMetadata, instantTime));
+ dataMetaClient, rollbackMetadata, instantTime));
if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
LOG.info("Rolling back MDT deltacommit " + commitInstantTime);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index f431283ac7a..2fc69ed5887 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -94,7 +94,7 @@ public class HoodieMetadataWriteUtils {
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
- .retainCommits(Math.min(writeConfig.getCleanerCommitsRetained(), DEFAULT_METADATA_CLEANER_COMMITS_RETAINED))
+ .retainCommits(10)
.build())
// we will trigger archive manually, to ensure only regular writer invokes it
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
index caeac4d322d..c18e28c9393 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index fa654f2e706..08844604f88 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1596,9 +1596,11 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
* Test several table operations with restore. This test uses SparkRDDWriteClient.
* Once the restore support is ready in HoodieTestTable, then rewrite this test.
*/
- @ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exception {
+ //@ParameterizedTest
+ //@EnumSource(HoodieTableType.class)
+ @Test
+ public void testTableOperationsWithRestore() throws Exception {
+ HoodieTableType tableType = MERGE_ON_READ;
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
@@ -1856,13 +1858,18 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// Deletes
newCommitTime = "20210101000900000";
- records = dataGen.generateDeletes(newCommitTime, 10);
+ /*records = dataGen.generateDeletes(newCommitTime, 10);
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey());
client.startCommitWithTime(newCommitTime);
- client.delete(deleteKeys, newCommitTime);
+ client.delete(deleteKeys, newCommitTime);*/
+
+ client.startCommitWithTime(newCommitTime);
+ records = dataGen.generateUpdates(newCommitTime, 5);
+ writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
// Clean
- newCommitTime = "20210101000900000";
+ newCommitTime = "20210101001000000";
client.clean(newCommitTime);
validateMetadata(client);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 7a3006277e9..9500bbd98ff 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -342,6 +342,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.enable(useFileListingMetadata)
.enableMetrics(enableMetrics)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 63e0d5dbe1b..977706c5bbc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -49,7 +49,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
// Meta fields are not populated by default for metadata table
public static final boolean DEFAULT_METADATA_POPULATE_META_FIELDS = false;
// Default number of commits to retain, without cleaning, on metadata table
- public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 3;
+ public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 10;
public static final String METADATA_PREFIX = "hoodie.metadata";
public static final String OPTIMIZED_LOG_BLOCKS_SCAN = ".optimized.log.blocks.scan.enable";
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 6186010ab74..51fe34badd2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -459,6 +459,10 @@ public interface HoodieTimeline extends Serializable {
return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant);
}
+ static HoodieInstant getRestoreRequestedInstant(HoodieInstant instant) {
+ return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant);
+ }
+
static HoodieInstant getIndexRequestedInstant(final String timestamp) {
return new HoodieInstant(State.REQUESTED, INDEXING_ACTION, timestamp);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cc4f1dd5e32..b0d2b4aa304 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -90,6 +90,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1286,6 +1287,11 @@ public class HoodieTableMetadataUtil {
validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline));
});
+ // add restore instants from MDT.
+ metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
+ .filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
+ .getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));
+
// SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp
validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, PARTITION_INITIALIZATION_TIME_SUFFIX));
return validInstantTimestamps;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment