-
-
Save bleskes/9177f512ebe803dc07dbfdda6f8ef2b7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
index 5bb26b2..6c1f24c 100644 | |
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
@@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; | |
import org.elasticsearch.index.IndexNotFoundException; | |
import org.elasticsearch.index.engine.RecoveryEngineException; | |
import org.elasticsearch.index.mapper.MapperException; | |
-import org.elasticsearch.index.seqno.SequenceNumbers; | |
import org.elasticsearch.index.seqno.SequenceNumbersService; | |
import org.elasticsearch.index.shard.IllegalIndexShardStateException; | |
import org.elasticsearch.index.shard.IndexEventListener; | |
@@ -62,11 +61,9 @@ import org.elasticsearch.transport.TransportResponse; | |
import org.elasticsearch.transport.TransportService; | |
import java.io.IOException; | |
-import java.util.List; | |
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
-import java.util.function.Function; | |
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; | |
@@ -176,17 +173,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
return; | |
} | |
final RecoveryTarget recoveryTarget = recoveryRef.target(); | |
- assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; | |
- | |
- final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget); | |
- if (!maybeRequest.isPresent()) return; | |
- else request = maybeRequest.get(); | |
- | |
cancellableThreads = recoveryTarget.cancellableThreads(); | |
timer = recoveryTarget.state().getTimer(); | |
- | |
- logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); | |
- recoveryTarget.indexShard().prepareForIndexRecovery(); | |
+ try { | |
+ assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; | |
+ request = getStartRecoveryRequest(recoveryTarget); | |
+ logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); | |
+ recoveryTarget.indexShard().prepareForIndexRecovery(); | |
+ } catch (Exception e) { | |
+ // this will be logged as warning later on... | |
+ logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); | |
+ onGoingRecoveries.failRecovery(recoveryId, | |
+ new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); | |
+ return; | |
+ } | |
} | |
try { | |
@@ -290,7 +290,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
} | |
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") | |
- private static Optional<Store.MetadataSnapshot> EMPTY_METADATA_SNAPSHOT = Optional.of(Store.MetadataSnapshot.EMPTY); | |
+ private static Store.MetadataSnapshot EMPTY_METADATA_SNAPSHOT = Store.MetadataSnapshot.EMPTY; | |
/** | |
* Obtains a snapshot of the store metadata for the recovery target, or an empty {@link Optional} if obtaining the store metadata | |
@@ -299,14 +299,14 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
* @param recoveryTarget the target of the recovery | |
* @return a snapshot of the store metdata, or an empty {@link Optional} | |
*/ | |
- private Optional<Store.MetadataSnapshot> getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { | |
+ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { | |
try { | |
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { | |
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the | |
// primary changing files underneath us | |
return EMPTY_METADATA_SNAPSHOT; | |
} else { | |
- return Optional.of(recoveryTarget.indexShard().snapshotStoreMetadata()); | |
+ return recoveryTarget.indexShard().snapshotStoreMetadata(); | |
} | |
} catch (org.apache.lucene.index.IndexNotFoundException e) { | |
// happens on an empty folder. no need to log | |
@@ -315,12 +315,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
} catch (final IOException e) { | |
logger.warn("error while listing local files, recover as if there are none", e); | |
return EMPTY_METADATA_SNAPSHOT; | |
- } catch (final Exception e) { | |
- // this will be logged as warning later on... | |
- logger.trace("unexpected error while listing local files, failing recovery", e); | |
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), | |
- new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); | |
- return Optional.empty(); | |
} | |
} | |
@@ -330,49 +324,39 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
* @param recoveryTarget the target of the recovery | |
* @return a start recovery request, or an empty {@link Optional} | |
*/ | |
- private Optional<StartRecoveryRequest> getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { | |
+ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { | |
final StartRecoveryRequest request; | |
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
- final Optional<Store.MetadataSnapshot> metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); | |
- if (!metadataSnapshot.isPresent()) return Optional.empty(); | |
- logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.get().size()); | |
+ final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); | |
+ logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); | |
- try { | |
- final long startingSeqNo; | |
- if (metadataSnapshot.get().size() > 0) { | |
- startingSeqNo = getStartingSeqNo(recoveryTarget); | |
- } else { | |
- startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | |
- } | |
- | |
- if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | |
- logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
- } else { | |
- logger.trace( | |
- "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", | |
- recoveryTarget.shardId(), | |
- startingSeqNo, | |
- recoveryTarget.sourceNode()); | |
- } | |
+ final long startingSeqNo; | |
+ if (metadataSnapshot.size() > 0) { | |
+ startingSeqNo = getStartingSeqNo(recoveryTarget); | |
+ } else { | |
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | |
+ } | |
- request = new StartRecoveryRequest( | |
+ if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | |
+ logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
+ } else { | |
+ logger.trace( | |
+ "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", | |
recoveryTarget.shardId(), | |
- recoveryTarget.sourceNode(), | |
- clusterService.localNode(), | |
- metadataSnapshot.get(), | |
- recoveryTarget.state().getPrimary(), | |
- recoveryTarget.recoveryId(), | |
- startingSeqNo); | |
- | |
- } catch (final Exception e) { | |
- // this will be logged as warning later on... | |
- logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); | |
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), | |
- new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); | |
- return Optional.empty(); | |
+ startingSeqNo, | |
+ recoveryTarget.sourceNode()); | |
} | |
- return Optional.of(request); | |
+ | |
+ request = new StartRecoveryRequest( | |
+ recoveryTarget.shardId(), | |
+ recoveryTarget.sourceNode(), | |
+ clusterService.localNode(), | |
+ metadataSnapshot, | |
+ recoveryTarget.state().getPrimary(), | |
+ recoveryTarget.recoveryId(), | |
+ startingSeqNo); | |
+ return request; | |
} | |
/** |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
index 5bb26b2..6c1f24c 100644 | |
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java | |
@@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; | |
import org.elasticsearch.index.IndexNotFoundException; | |
import org.elasticsearch.index.engine.RecoveryEngineException; | |
import org.elasticsearch.index.mapper.MapperException; | |
-import org.elasticsearch.index.seqno.SequenceNumbers; | |
import org.elasticsearch.index.seqno.SequenceNumbersService; | |
import org.elasticsearch.index.shard.IllegalIndexShardStateException; | |
import org.elasticsearch.index.shard.IndexEventListener; | |
@@ -62,11 +61,9 @@ import org.elasticsearch.transport.TransportResponse; | |
import org.elasticsearch.transport.TransportService; | |
import java.io.IOException; | |
-import java.util.List; | |
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
-import java.util.function.Function; | |
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; | |
@@ -176,17 +173,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
return; | |
} | |
final RecoveryTarget recoveryTarget = recoveryRef.target(); | |
- assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; | |
- | |
- final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget); | |
- if (!maybeRequest.isPresent()) return; | |
- else request = maybeRequest.get(); | |
- | |
cancellableThreads = recoveryTarget.cancellableThreads(); | |
timer = recoveryTarget.state().getTimer(); | |
- | |
- logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); | |
- recoveryTarget.indexShard().prepareForIndexRecovery(); | |
+ try { | |
+ assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; | |
+ request = getStartRecoveryRequest(recoveryTarget); | |
+ logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); | |
+ recoveryTarget.indexShard().prepareForIndexRecovery(); | |
+ } catch (Exception e) { | |
+ // this will be logged as warning later on... | |
+ logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); | |
+ onGoingRecoveries.failRecovery(recoveryId, | |
+ new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); | |
+ return; | |
+ } | |
} | |
try { | |
@@ -290,7 +290,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
} | |
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") | |
- private static Optional<Store.MetadataSnapshot> EMPTY_METADATA_SNAPSHOT = Optional.of(Store.MetadataSnapshot.EMPTY); | |
+ private static Store.MetadataSnapshot EMPTY_METADATA_SNAPSHOT = Store.MetadataSnapshot.EMPTY; | |
/** | |
* Obtains a snapshot of the store metadata for the recovery target, or an empty {@link Optional} if obtaining the store metadata | |
@@ -299,14 +299,14 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
* @param recoveryTarget the target of the recovery | |
* @return a snapshot of the store metdata, or an empty {@link Optional} | |
*/ | |
- private Optional<Store.MetadataSnapshot> getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { | |
+ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { | |
try { | |
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { | |
// we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the | |
// primary changing files underneath us | |
return EMPTY_METADATA_SNAPSHOT; | |
} else { | |
- return Optional.of(recoveryTarget.indexShard().snapshotStoreMetadata()); | |
+ return recoveryTarget.indexShard().snapshotStoreMetadata(); | |
} | |
} catch (org.apache.lucene.index.IndexNotFoundException e) { | |
// happens on an empty folder. no need to log | |
@@ -315,12 +315,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
} catch (final IOException e) { | |
logger.warn("error while listing local files, recover as if there are none", e); | |
return EMPTY_METADATA_SNAPSHOT; | |
- } catch (final Exception e) { | |
- // this will be logged as warning later on... | |
- logger.trace("unexpected error while listing local files, failing recovery", e); | |
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), | |
- new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); | |
- return Optional.empty(); | |
} | |
} | |
@@ -330,49 +324,39 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde | |
* @param recoveryTarget the target of the recovery | |
* @return a start recovery request, or an empty {@link Optional} | |
*/ | |
- private Optional<StartRecoveryRequest> getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { | |
+ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { | |
final StartRecoveryRequest request; | |
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
- final Optional<Store.MetadataSnapshot> metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); | |
- if (!metadataSnapshot.isPresent()) return Optional.empty(); | |
- logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.get().size()); | |
+ final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); | |
+ logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); | |
- try { | |
- final long startingSeqNo; | |
- if (metadataSnapshot.get().size() > 0) { | |
- startingSeqNo = getStartingSeqNo(recoveryTarget); | |
- } else { | |
- startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | |
- } | |
- | |
- if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | |
- logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
- } else { | |
- logger.trace( | |
- "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", | |
- recoveryTarget.shardId(), | |
- startingSeqNo, | |
- recoveryTarget.sourceNode()); | |
- } | |
+ final long startingSeqNo; | |
+ if (metadataSnapshot.size() > 0) { | |
+ startingSeqNo = getStartingSeqNo(recoveryTarget); | |
+ } else { | |
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; | |
+ } | |
- request = new StartRecoveryRequest( | |
+ if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | |
+ logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); | |
+ } else { | |
+ logger.trace( | |
+ "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", | |
recoveryTarget.shardId(), | |
- recoveryTarget.sourceNode(), | |
- clusterService.localNode(), | |
- metadataSnapshot.get(), | |
- recoveryTarget.state().getPrimary(), | |
- recoveryTarget.recoveryId(), | |
- startingSeqNo); | |
- | |
- } catch (final Exception e) { | |
- // this will be logged as warning later on... | |
- logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); | |
- onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), | |
- new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); | |
- return Optional.empty(); | |
+ startingSeqNo, | |
+ recoveryTarget.sourceNode()); | |
} | |
- return Optional.of(request); | |
+ | |
+ request = new StartRecoveryRequest( | |
+ recoveryTarget.shardId(), | |
+ recoveryTarget.sourceNode(), | |
+ clusterService.localNode(), | |
+ metadataSnapshot, | |
+ recoveryTarget.state().getPrimary(), | |
+ recoveryTarget.recoveryId(), | |
+ startingSeqNo); | |
+ return request; | |
} | |
/** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment